-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: Support New Sharding Strategy To Run Balanceful On Multiple …
…Servers For Single Sharding Scenario (#2462) * feature: #2461 * fix problems of code formatting validation * remove some comments --------- Co-authored-by: baiyangzhuhong <baiyangzhuhong@163.com>
- Loading branch information
1 parent
88378d2
commit a5cfb4a
Showing
10 changed files
with
994 additions
and
152 deletions.
There are no files selected for viewing
234 changes: 234 additions & 0 deletions
234
...n/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.shardingsphere.elasticjob.kernel.executor.facade; | ||
|
||
import com.google.common.base.Strings; | ||
|
||
import java.util.Collection; | ||
import java.util.Comparator; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.shardingsphere.elasticjob.api.JobConfiguration; | ||
import org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException; | ||
import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService; | ||
import org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext; | ||
import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService; | ||
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService; | ||
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService; | ||
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService; | ||
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; | ||
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus; | ||
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; | ||
import org.apache.shardingsphere.elasticjob.spi.executor.item.param.JobRuntimeService; | ||
import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; | ||
import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; | ||
import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobExecutionEvent; | ||
import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent; | ||
import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent.State; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* Abstract Job facade. | ||
*/ | ||
@Slf4j | ||
abstract class AbstractJobFacade implements JobFacade { | ||
|
||
private final ConfigurationService configService; | ||
|
||
private final ShardingService shardingService; | ||
|
||
private final ExecutionContextService executionContextService; | ||
|
||
private final ExecutionService executionService; | ||
|
||
private final FailoverService failoverService; | ||
|
||
private final Collection<ElasticJobListener> elasticJobListeners; | ||
|
||
private final JobTracingEventBus jobTracingEventBus; | ||
|
||
AbstractJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners, final TracingConfiguration<?> tracingConfig) { | ||
configService = new ConfigurationService(regCenter, jobName); | ||
shardingService = new ShardingService(regCenter, jobName); | ||
executionContextService = new ExecutionContextService(regCenter, jobName); | ||
executionService = new ExecutionService(regCenter, jobName); | ||
failoverService = new FailoverService(regCenter, jobName); | ||
this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList()); | ||
this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig); | ||
} | ||
|
||
/** | ||
* Load job configuration. | ||
* | ||
* @param fromCache load from cache or not | ||
* @return job configuration | ||
*/ | ||
@Override | ||
public JobConfiguration loadJobConfiguration(final boolean fromCache) { | ||
return configService.load(fromCache); | ||
} | ||
|
||
/** | ||
* Check job execution environment. | ||
* | ||
* @throws org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException job execution environment exception | ||
*/ | ||
@Override | ||
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException { | ||
configService.checkMaxTimeDiffSecondsTolerable(); | ||
} | ||
|
||
/** | ||
* Failover If necessary. | ||
*/ | ||
@Override | ||
public void failoverIfNecessary() { | ||
if (configService.load(true).isFailover()) { | ||
failoverService.failoverIfNecessary(); | ||
} | ||
} | ||
|
||
/** | ||
* Register job begin. | ||
* | ||
* @param shardingContexts sharding contexts | ||
*/ | ||
@Override | ||
public void registerJobBegin(final ShardingContexts shardingContexts) { | ||
executionService.registerJobBegin(shardingContexts); | ||
} | ||
|
||
/** | ||
* Register job completed. | ||
* | ||
* @param shardingContexts sharding contexts | ||
*/ | ||
@Override | ||
public void registerJobCompleted(final ShardingContexts shardingContexts) { | ||
executionService.registerJobCompleted(shardingContexts); | ||
if (configService.load(true).isFailover()) { | ||
failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); | ||
} | ||
} | ||
|
||
public abstract ShardingContexts getShardingContexts(); | ||
|
||
/** | ||
* Set task misfire flag. | ||
* | ||
* @param shardingItems sharding items to be set misfire flag | ||
* @return whether satisfy misfire condition | ||
*/ | ||
@Override | ||
public boolean misfireIfRunning(final Collection<Integer> shardingItems) { | ||
return executionService.misfireIfHasRunningItems(shardingItems); | ||
} | ||
|
||
/** | ||
* Clear misfire flag. | ||
* | ||
* @param shardingItems sharding items to be cleared misfire flag | ||
*/ | ||
@Override | ||
public void clearMisfire(final Collection<Integer> shardingItems) { | ||
executionService.clearMisfire(shardingItems); | ||
} | ||
|
||
/** | ||
* Judge job whether to need to execute misfire tasks. | ||
* | ||
* @param shardingItems sharding items | ||
* @return need to execute misfire tasks or not | ||
*/ | ||
@Override | ||
public boolean isExecuteMisfired(final Collection<Integer> shardingItems) { | ||
return configService.load(true).isMisfire() && !isNeedSharding() && !executionService.getMisfiredJobItems(shardingItems).isEmpty(); | ||
} | ||
|
||
/** | ||
* Judge job whether to need resharding. | ||
* | ||
* @return need resharding or not | ||
*/ | ||
@Override | ||
public boolean isNeedSharding() { | ||
return shardingService.isNeedSharding(); | ||
} | ||
|
||
/** | ||
* Call before job executed. | ||
* | ||
* @param shardingContexts sharding contexts | ||
*/ | ||
@Override | ||
public void beforeJobExecuted(final ShardingContexts shardingContexts) { | ||
for (ElasticJobListener each : elasticJobListeners) { | ||
each.beforeJobExecuted(shardingContexts); | ||
} | ||
} | ||
|
||
/** | ||
* Call after job executed. | ||
* | ||
* @param shardingContexts sharding contexts | ||
*/ | ||
@Override | ||
public void afterJobExecuted(final ShardingContexts shardingContexts) { | ||
for (ElasticJobListener each : elasticJobListeners) { | ||
each.afterJobExecuted(shardingContexts); | ||
} | ||
} | ||
|
||
/** | ||
* Post job execution event. | ||
* | ||
* @param jobExecutionEvent job execution event | ||
*/ | ||
@Override | ||
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { | ||
jobTracingEventBus.post(jobExecutionEvent); | ||
} | ||
|
||
/** | ||
* Post job status trace event. | ||
* | ||
* @param taskId task Id | ||
* @param state job state | ||
* @param message job message | ||
*/ | ||
@Override | ||
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) { | ||
TaskContext taskContext = TaskContext.from(taskId); | ||
jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), | ||
taskContext.getSlaveId(), taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message)); | ||
if (!Strings.isNullOrEmpty(message)) { | ||
log.trace(message); | ||
} | ||
} | ||
|
||
/** | ||
* Get job runtime service. | ||
* | ||
* @return job runtime service | ||
*/ | ||
@Override | ||
public JobRuntimeService getJobRuntimeService() { | ||
return new JobJobRuntimeServiceImpl(this); | ||
} | ||
} |
Oops, something went wrong.