Skip to content

Commit

Permalink
For #195.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Feb 20, 2017
1 parent 19d6e79 commit 50a9db6
Show file tree
Hide file tree
Showing 26 changed files with 184 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function renderSettings() {
$("#jobShardingStrategyClass").attr("value", data.jobShardingStrategyClass);
$("#executorServiceHandler").attr("value", data.jobProperties["executor_service_handler"]);
$("#jobExceptionHandler").attr("value", data.jobProperties["job_exception_handler"]);
$("#reconcileIntervalSeconds").attr("value", data.reconcileIntervalSeconds);
$("#reconcileIntervalMinutes").attr("value", data.reconcileIntervalMinutes);
$("#description").text(data.description);
if (!data.monitorExecution) {
$("#execution_info_tab").addClass("disabled");
Expand Down Expand Up @@ -74,8 +74,8 @@ function bindSubmitJobSettingsForm() {
var executorServiceHandler = $("#executorServiceHandler").val();
var jobExceptionHandler = $("#jobExceptionHandler").val();
var description = $("#description").val();
var reconcileIntervalSeconds = $("#reconcileIntervalSeconds").val();
var postJson = {jobName: jobName, jobType : jobType, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, streamingProcess: streamingProcess, maxTimeDiffSeconds: maxTimeDiffSeconds, monitorPort: monitorPort, monitorExecution: monitorExecution, failover: failover, misfire: misfire, shardingItemParameters: shardingItemParameters, jobShardingStrategyClass: jobShardingStrategyClass, jobProperties: {"executor_service_handler": executorServiceHandler, "job_exception_handler": jobExceptionHandler}, description: description, scriptCommandLine: scriptCommandLine, reconcileIntervalSeconds:reconcileIntervalSeconds};
var reconcileIntervalMinutes = $("#reconcileIntervalMinutes").val();
var postJson = {jobName: jobName, jobType : jobType, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, streamingProcess: streamingProcess, maxTimeDiffSeconds: maxTimeDiffSeconds, monitorPort: monitorPort, monitorExecution: monitorExecution, failover: failover, misfire: misfire, shardingItemParameters: shardingItemParameters, jobShardingStrategyClass: jobShardingStrategyClass, jobProperties: {"executor_service_handler": executorServiceHandler, "job_exception_handler": jobExceptionHandler}, description: description, scriptCommandLine: scriptCommandLine, reconcileIntervalMinutes:reconcileIntervalMinutes};
$.post("job/settings", postJson, function() {
showSuccessDialog();
if (monitorExecution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
<input type="number" id="monitorPort" name="monitorPort" class="form-control" data-toggle="tooltip" data-placement="bottom" title="抓取作业注册信息监听服务端口。配置为-1表示不启用监听服务。" />
</div>

<label for="reconcileIntervalSeconds" class="col-sm-2 control-label">作业服务器状态修复周期</label>
<label for="reconcileIntervalMinutes" class="col-sm-2 control-label">作业服务器状态修复周期</label>
<div class="col-sm-2">
<input type="number" id="reconcileIntervalSeconds" name="reconcileIntervalSeconds" class="form-control" data-toggle="tooltip" data-placement="bottom" title="修复错误状态的作业服务器扫描修复周期。配置为-1表示不启用修复动作。" />
<input type="number" id="reconcileIntervalMinutes" name="reconcileIntervalMinutes" class="form-control" data-toggle="tooltip" data-placement="bottom" title="修复错误状态的作业服务器扫描修复周期。配置为-1表示不启用修复动作。" />
</div>
</div>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class LiteJobConfiguration implements JobRootConfiguration {

private final boolean overwrite;

private final int reconcileIntervalSeconds;
private final int reconcileIntervalMinutes;

/**
* 获取作业名称.
Expand Down Expand Up @@ -95,7 +95,7 @@ public static class Builder {

private boolean overwrite;

private int reconcileIntervalSeconds = 60;
private int reconcileIntervalMinutes = -1;

/**
* 设置监控作业执行时状态.
Expand Down Expand Up @@ -194,18 +194,18 @@ public Builder overwrite(final boolean overwrite) {
}

/**
* 设置监视作业服务器状态的reconcile线程执行间隔秒数.
* 设置监视作业服务器状态的reconcile线程执行间隔分钟数.
*
* <p>
* 每隔一段时间监视作业服务器的状态,如果不正确则重新分片。
* </p>
*
* @param reconcileIntervalSeconds reconcile线程执行间隔秒数
* @param reconcileIntervalMinutes reconcile线程执行间隔分钟数
*
* @return 作业配置构建器
*/
public Builder reconcileIntervalSeconds(final int reconcileIntervalSeconds) {
this.reconcileIntervalSeconds = reconcileIntervalSeconds;
public Builder reconcileIntervalMinutes(final int reconcileIntervalMinutes) {
this.reconcileIntervalMinutes = reconcileIntervalMinutes;
return this;
}

Expand All @@ -215,7 +215,7 @@ public Builder reconcileIntervalSeconds(final int reconcileIntervalSeconds) {
* @return 作业配置对象
*/
public final LiteJobConfiguration build() {
return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, disabled, overwrite, reconcileIntervalSeconds);
return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, disabled, overwrite, reconcileIntervalMinutes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ protected void addToCustomizedValueMap(final String jsonName, final JsonReader i
case "overwrite":
customizedValueMap.put("overwrite", in.nextBoolean());
break;
case "reconcileIntervalSeconds":
customizedValueMap.put("reconcileIntervalSeconds", in.nextInt());
case "reconcileIntervalMinutes":
customizedValueMap.put("reconcileIntervalMinutes", in.nextInt());
break;
default:
in.skipValue();
Expand Down Expand Up @@ -129,8 +129,8 @@ protected LiteJobConfiguration getJobRootConfiguration(final JobTypeConfiguratio
if (customizedValueMap.containsKey("overwrite")) {
builder.overwrite((boolean) customizedValueMap.get("overwrite"));
}
if (customizedValueMap.containsKey("reconcileIntervalSeconds")) {
builder.reconcileIntervalSeconds((int) customizedValueMap.get("reconcileIntervalSeconds"));
if (customizedValueMap.containsKey("reconcileIntervalMinutes")) {
builder.reconcileIntervalMinutes((int) customizedValueMap.get("reconcileIntervalMinutes"));
}
return builder.build();
}
Expand All @@ -143,7 +143,7 @@ protected void writeCustomized(final JsonWriter out, final LiteJobConfiguration
out.name("jobShardingStrategyClass").value(value.getJobShardingStrategyClass());
out.name("disabled").value(value.isDisabled());
out.name("overwrite").value(value.isOverwrite());
out.name("reconcileIntervalSeconds").value(value.getReconcileIntervalSeconds());
out.name("reconcileIntervalMinutes").value(value.getReconcileIntervalMinutes());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.dangdang.ddframe.job.lite.internal.guarantee.GuaranteeListenerManager;
import com.dangdang.ddframe.job.lite.internal.server.JobOperationListenerManager;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingListenerManager;
import com.dangdang.ddframe.job.lite.internal.worker.reconcile.ReconcileListenerManager;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;

import java.util.List;
Expand All @@ -51,8 +50,6 @@ public class ListenerManager {

private final GuaranteeListenerManager guaranteeListenerManager;

private final ReconcileListenerManager reconcileListenerManager;

public ListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
electionListenerManager = new ElectionListenerManager(regCenter, jobName);
shardingListenerManager = new ShardingListenerManager(regCenter, jobName);
Expand All @@ -61,7 +58,6 @@ public ListenerManager(final CoordinatorRegistryCenter regCenter, final String j
jobOperationListenerManager = new JobOperationListenerManager(regCenter, jobName);
configurationListenerManager = new ConfigurationListenerManager(regCenter, jobName);
guaranteeListenerManager = new GuaranteeListenerManager(regCenter, jobName, elasticJobListeners);
reconcileListenerManager = new ReconcileListenerManager(regCenter, jobName);
}

/**
Expand All @@ -75,7 +71,6 @@ public void startAllListeners() {
jobOperationListenerManager.start();
configurationListenerManager.start();
guaranteeListenerManager.start();
reconcileListenerManager.start();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.lite.internal.reconcile;

import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.lite.internal.election.LeaderElectionService;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.google.common.util.concurrent.AbstractScheduledService;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
* 修复作业服务器不一致状态服务.
*
* @author caohao
*
*/
@Slf4j
public class ReconcileService extends AbstractScheduledService {

private long lastReconcileTime;

private final ConfigurationService configService;

private final ShardingService shardingService;

private final LeaderElectionService leaderElectionService;

public ReconcileService(final CoordinatorRegistryCenter regCenter, final String jobName) {
lastReconcileTime = System.currentTimeMillis();
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
leaderElectionService = new LeaderElectionService(regCenter, jobName);
}

@Override
protected void runOneIteration() throws Exception {
LiteJobConfiguration config = configService.load(true);
int reconcileIntervalMinutes = null == config || config.getReconcileIntervalMinutes() <= 0 ? -1 : config.getReconcileIntervalMinutes();
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
if (leaderElectionService.isLeader() && !shardingService.isNeedSharding()
&& shardingService.hasNotRunningShardingNode()) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
shardingService.setReshardingFlag();
}
}
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import com.dangdang.ddframe.job.lite.internal.monitor.MonitorService;
import com.dangdang.ddframe.job.lite.internal.server.ServerService;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.lite.internal.worker.WorkersManager;
import com.dangdang.ddframe.job.lite.internal.worker.reconcile.ReconcileWorker;
import com.dangdang.ddframe.job.lite.internal.reconcile.ReconcileService;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;

import java.util.List;
Expand All @@ -53,7 +52,7 @@ public class SchedulerFacade {

private final ListenerManager listenerManager;

private final WorkersManager workersManager;
private final ReconcileService reconcileService;

public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
configService = new ConfigurationService(regCenter, jobName);
Expand All @@ -62,8 +61,8 @@ public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String j
shardingService = new ShardingService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
monitorService = new MonitorService(regCenter, jobName);
reconcileService = new ReconcileService(regCenter, jobName);
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
workersManager = new WorkersManager(regCenter, jobName);
}

/**
Expand All @@ -87,7 +86,7 @@ public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
shardingService.setReshardingFlag();
monitorService.listen();
listenerManager.setCurrentShardingTotalCount(configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount());
workersManager.start();
reconcileService.startAsync();
}

/**
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 50a9db6

Please sign in to comment.