Skip to content

Commit

Permalink
Add dolphinscheduler-scheduler module (apache#10360)
Browse files Browse the repository at this point in the history
* Add dolphinscheduler-scheduler module
  • Loading branch information
ruanwenjun authored and Tianqi-Dotes committed Jun 16, 2022
1 parent 5c6695d commit 5c38ae9
Show file tree
Hide file tree
Showing 25 changed files with 520 additions and 283 deletions.
6 changes: 6 additions & 0 deletions dolphinscheduler-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -89,8 +89,8 @@
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,10 @@
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutor;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.quartz.CronExpression;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -99,13 +95,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
private ProcessDefinitionMapper processDefinitionMapper;

@Autowired
private Scheduler scheduler;
private SchedulerApi schedulerApi;

@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;

@Autowired
private QuartzExecutor quartzExecutor;

/**
* save schedule
Expand Down Expand Up @@ -173,7 +167,7 @@ public Map<String, Object> insertSchedule(User loginUser,

scheduleObj.setStartTime(scheduleParam.getStartTime());
scheduleObj.setEndTime(scheduleParam.getEndTime());
if (!CronExpression.isValidExpression(scheduleParam.getCrontab())) {
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
logger.error("{} verify failure", scheduleParam.getCrontab());

putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
Expand Down Expand Up @@ -391,7 +385,7 @@ public Map<String, Object> setScheduleState(User loginUser,
*/
@Override
public Result querySchedule(User loginUser, long projectCode, long processDefineCode, String searchVal,
Integer pageNo, Integer pageSize) {
Integer pageNo, Integer pageSize) {
Result result = new Result();

Project project = projectMapper.queryByCode(projectCode);
Expand All @@ -410,7 +404,7 @@ public Result querySchedule(User loginUser, long projectCode, long processDefine

Page<Schedule> page = new Page<>(pageNo, pageSize);
IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode,
searchVal);
searchVal);

List<ScheduleVo> scheduleList = new ArrayList<>();
for (Schedule schedule : scheduleIPage.getRecords()) {
Expand Down Expand Up @@ -457,8 +451,7 @@ public Map<String, Object> queryScheduleList(User loginUser, long projectCode) {

public void setSchedule(int projectId, Schedule schedule) {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());

quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
schedulerApi.insertOrUpdateScheduleTask(projectId, schedule);
}

/**
Expand All @@ -471,20 +464,7 @@ public void setSchedule(int projectId, Schedule schedule) {
@Override
public void deleteSchedule(int projectId, int scheduleId) {
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);

String jobName = quartzExecutor.buildJobName(scheduleId);
String jobGroupName = quartzExecutor.buildJobGroupName(projectId);

JobKey jobKey = new JobKey(jobName, jobGroupName);
try {
if (scheduler.checkExists(jobKey)) {
logger.info("Try to delete job: {}, group name: {},", jobName, jobGroupName);
scheduler.deleteJob(jobKey);
}
} catch (SchedulerException e) {
logger.error("Failed to delete job: {}", jobKey);
throw new ServiceException("Failed to delete job: " + jobKey);
}
schedulerApi.deleteScheduleTask(projectId, scheduleId);
}

/**
Expand Down Expand Up @@ -671,7 +651,7 @@ private void updateSchedule(Map<String, Object> result,

schedule.setStartTime(scheduleParam.getStartTime());
schedule.setEndTime(scheduleParam.getEndTime());
if (!CronExpression.isValidExpression(scheduleParam.getCrontab())) {
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.dolphinscheduler.common;

import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;

import java.util.regex.Pattern;

Expand Down Expand Up @@ -502,26 +501,6 @@ private Constants() {
* underline "_"
*/
public static final String UNDERLINE = "_";
/**
* quartz job prifix
*/
public static final String QUARTZ_JOB_PREFIX = "job";
/**
* quartz job group prifix
*/
public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
/**
* projectId
*/
public static final String PROJECT_ID = "projectId";
/**
* processId
*/
public static final String SCHEDULE_ID = "scheduleId";
/**
* schedule
*/
public static final String SCHEDULE = "schedule";
/**
* application regex
*/
Expand Down
6 changes: 6 additions & 0 deletions dolphinscheduler-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,14 @@
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;

import javax.annotation.PostConstruct;

import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,6 +38,8 @@
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.annotation.PostConstruct;

@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
Expand All @@ -72,7 +60,7 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService;

@Autowired
private Scheduler scheduler;
private SchedulerApi schedulerApi;

@Autowired
private EventExecuteService eventExecuteService;
Expand Down Expand Up @@ -110,7 +98,7 @@ public void run() throws SchedulerException {
this.eventExecuteService.start();
this.failoverExecuteThread.start();

this.scheduler.start();
this.schedulerApi.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
Expand Down Expand Up @@ -145,6 +133,7 @@ public void close(String cause) {
logger.warn("thread sleep exception ", e);
}
// close
this.schedulerApi.close();
this.masterSchedulerService.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-plugin</artifactId>
<version>3.0.1-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dolphinscheduler-scheduler-api</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.api;

import org.apache.dolphinscheduler.dao.entity.Schedule;

/**
* This is the interface for scheduler, contains methods to operate schedule task.
*/
public interface SchedulerApi extends AutoCloseable {

/**
* Start the scheduler, if not start, the scheduler will not execute task.
*
* @throws SchedulerException if start failed.
*/
void start() throws SchedulerException;

/**
* @param projectId project id, the schedule task belongs to.
* @param schedule schedule metadata.
* @throws SchedulerException if insert/update failed.
*/
void insertOrUpdateScheduleTask(int projectId, Schedule schedule) throws SchedulerException;

/**
* Delete a schedule task.
*
* @param projectId project id, the schedule task belongs to.
* @param scheduleId schedule id.
* @throws SchedulerException if delete failed.
*/
void deleteScheduleTask(int projectId, int scheduleId) throws SchedulerException;

/**
* Close the scheduler and release the resource.
*
* @throws SchedulerException if close failed.
*/
void close() throws Exception;
}
Loading

0 comments on commit 5c38ae9

Please sign in to comment.