Skip to content

Commit

Permalink
Add dolphinscheduler-scheduler module (#10360)
Browse files Browse the repository at this point in the history
* Add dolphinscheduler-scheduler module
  • Loading branch information
ruanwenjun authored Jun 4, 2022
1 parent 022e488 commit 2d3be6b
Show file tree
Hide file tree
Showing 27 changed files with 483 additions and 247 deletions.
23 changes: 4 additions & 19 deletions dolphinscheduler-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
Expand Down Expand Up @@ -121,25 +125,6 @@
<artifactId>spring-context</artifactId>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<exclusions>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>mchange-commons-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutor;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -60,12 +59,10 @@
import java.util.Map;

import org.quartz.CronExpression;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -102,13 +99,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
private ProcessDefinitionMapper processDefinitionMapper;

@Autowired
private Scheduler scheduler;
private SchedulerApi schedulerApi;

@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;

@Autowired
private QuartzExecutor quartzExecutor;

/**
* save schedule
Expand Down Expand Up @@ -460,8 +455,7 @@ public Map<String, Object> queryScheduleList(User loginUser, long projectCode) {

public void setSchedule(int projectId, Schedule schedule) {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());

quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
schedulerApi.insertOrUpdateScheduleTask(projectId, schedule);
}

/**
Expand All @@ -474,20 +468,7 @@ public void setSchedule(int projectId, Schedule schedule) {
@Override
public void deleteSchedule(int projectId, int scheduleId) {
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);

String jobName = quartzExecutor.buildJobName(scheduleId);
String jobGroupName = quartzExecutor.buildJobGroupName(projectId);

JobKey jobKey = new JobKey(jobName, jobGroupName);
try {
if (scheduler.checkExists(jobKey)) {
logger.info("Try to delete job: {}, group name: {},", jobName, jobGroupName);
scheduler.deleteJob(jobKey);
}
} catch (SchedulerException e) {
logger.error("Failed to delete job: {}", jobKey);
throw new ServiceException("Failed to delete job: " + jobKey);
}
schedulerApi.deleteScheduleTask(projectId, scheduleId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -53,7 +54,6 @@
* scheduler service test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(QuartzExecutorImpl.class)
public class SchedulerServiceTest {

@InjectMocks
Expand All @@ -80,8 +80,8 @@ public class SchedulerServiceTest {
@Mock
private ProjectServiceImpl projectService;

@InjectMocks
private QuartzExecutorImpl quartzExecutors;
@Mock
private SchedulerApi schedulerApi;

@Before
public void setUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,26 +494,6 @@ private Constants() {
* underline "_"
*/
public static final String UNDERLINE = "_";
/**
* quartz job prifix
*/
public static final String QUARTZ_JOB_PREFIX = "job";
/**
* quartz job group prifix
*/
public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
/**
* projectId
*/
public static final String PROJECT_ID = "projectId";
/**
* processId
*/
public static final String SCHEDULE_ID = "scheduleId";
/**
* schedule
*/
public static final String SCHEDULE = "schedule";
/**
* application regex
*/
Expand Down
4 changes: 4 additions & 0 deletions dolphinscheduler-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService;

@Autowired
private Scheduler scheduler;
private SchedulerApi schedulerApi;

@Autowired
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void run() throws SchedulerException {
this.eventExecuteService.start();
this.failoverExecuteThread.start();

this.scheduler.start();
this.schedulerApi.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
Expand Down Expand Up @@ -188,6 +189,7 @@ public void close(String cause) {
logger.warn("thread sleep exception ", e);
}
// close
this.schedulerApi.close();
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRegistryClient.closeRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.server.master.runner;

import net.bytebuddy.implementation.bytecode.Throw;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
Expand Down Expand Up @@ -71,8 +70,8 @@
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;

import org.apache.commons.collections.CollectionUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dolphinscheduler-scheduler-api</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.api;

import org.apache.dolphinscheduler.dao.entity.Schedule;

/**
* This is the interface for scheduler, contains methods to operate schedule task.
*/
public interface SchedulerApi extends AutoCloseable{

/**
* Start the scheduler, if not start, the scheduler will not execute task.
*
* @throws SchedulerException if start failed.
*/
void start() throws SchedulerException;

/**
* @param projectId project id, the schedule task belongs to.
* @param schedule schedule metadata.
* @throws SchedulerException if insert/update failed.
*/
void insertOrUpdateScheduleTask(int projectId, Schedule schedule) throws SchedulerException;

/**
* Delete a schedule task.
*
* @param projectId project id, the schedule task belongs to.
* @param scheduleId schedule id.
* @throws SchedulerException if delete failed.
*/
void deleteScheduleTask(int projectId, int scheduleId) throws SchedulerException;

/**
* Close the scheduler and release the resource.
*
* @throws SchedulerException if close failed.
*/
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,16 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.service.quartz;
package org.apache.dolphinscheduler.scheduler.api;

import org.apache.dolphinscheduler.dao.entity.Schedule;
public class SchedulerException extends RuntimeException {

import java.util.Map;
public SchedulerException(String message) {
super(message);
}

import org.quartz.Job;
public SchedulerException(String message, Throwable cause) {
super(message, cause);
}

public interface QuartzExecutor {

/**
* build job name
*/
String buildJobName(int scheduleId);

/**
* build job group name
*/
String buildJobGroupName(int projectId);

/**
* build data map of job detail
*/
Map<String, Object> buildDataMap(int projectId, Schedule schedule);

/**
* add job to quartz
*/
void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule);
}
Loading

0 comments on commit 2d3be6b

Please sign in to comment.