Skip to content

Commit

Permalink
fix 0.3.5 code review ban and enable jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
gelxiogong committed Nov 29, 2023
1 parent d03b726 commit 035bb56
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,17 @@ object JobConf {

val HIGHAVAILABLE_SOURCE: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.source", "aomp")

val HIGHAVAILABLE_POLICY: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy", "double")
// val HIGHAVAILABLE_POLICY: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy.double", "double")

val HIGHAVAILABLE_POLICY_DOUBLE_BAK: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy", "doubleWithBak")
val HIGHAVAILABLE_POLICY_DOUBLE: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy.double", "double")

val HIGHAVAILABLE_POLICY_SINGLE_BAK: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy", "singleWithBak")
val HIGHAVAILABLE_POLICY_DOUBLE_BAK: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy.doubleWithBak", "doubleWithBak")

val HIGHAVAILABLE_POLICY_SINGLE_BAK: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy.singleWithBak", "singleWithBak")

val HIGHAVAILABLE_POLICY_MANAGERSLAVE: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy.managerSlave", "managerSlave")

val HIGHAVAILABLE_POLICY_MANAGERSLAVE_BAK: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.policy.managerSlaveWithBak", "managerSlaveWithBak")

val HIGHAVAILABLE_DEFAULT_POLICY: CommonVars[String] = CommonVars("wds.streamis.app.highavailable.default.policy", "single")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static JobHighAvailableVo manageJobProjectFile(String highAvailablePolicy
return highAvailableVo;
} else {
//查job conf wds.streamis.app.highavailable.policy 值
if (highAvailablePolicy.equals(JobConf.HIGHAVAILABLE_POLICY().getValue()) || highAvailablePolicy.equals(JobConf.HIGHAVAILABLE_POLICY_DOUBLE_BAK().getValue())){
if (highAvailablePolicy.equals(JobConf.HIGHAVAILABLE_POLICY_DOUBLE().getValue()) || highAvailablePolicy.equals(JobConf.HIGHAVAILABLE_POLICY_DOUBLE_BAK().getValue())){
Map map = BDPJettyServerHelper.gson().fromJson(source, Map.class);
if (map.containsKey("source")) {
String sourceValue = map.get("source").toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class DefaultStreamJobInspectService extends StreamJobInspectService with Loggin
val jobVersion = if (StringUtils.isBlank(version)) {
streamJobMapper.getJobVersions(jobId).get(0)
} else streamJobMapper.getJobVersionById(jobId, version)
val highAvailablePolicy = streamJobConfMapper.getRawConfValue(jobId, JobConf.HIGHAVAILABLE_POLICY.key)
val highAvailablePolicy = streamJobConfMapper.getRawConfValue(jobId, JobConf.HIGHAVAILABLE_POLICY_KEY.getValue)
val sourceOption: Option[String] = Option(jobVersion.getSource)
sourceOption match {
case Some(source) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,10 @@ class DefaultStreamJobService extends StreamJobService with Logging {
if (streamJob.getStatus == 0){
val streamTask = this.streamTaskMapper.getLatestByJobId(jobId)
if (streamTask != null) {
val jobStatusVo = new JobStatusVo()
jobStatusVo.setStatusCode(streamTask.getStatus)
jobStatusVo.setStatus(JobConf.getStatusString(streamTask.getStatus))
jobStatusVo.setJobId(streamTask.getJobId)
jobStatusVo.setMessage(streamTask.getErrDesc)
if (!JobConf.isFinished(jobStatusVo.getStatusCode)) {
logger.warn(s"StreamJob-${jobId} is in status ${jobStatusVo.getStatus}, the job has not completed, can not be disabled")
val statusCode = streamTask.getStatus
val status = JobConf.getStatusString(statusCode)
if (!JobConf.isFinished(statusCode)) {
logger.warn(s"StreamJob-${jobId} is in status ${status}, the job has not completed, can not be disabled")
return false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class HighAvailableServiceImpl implements HighAvailableService {
@Override
public JobHighAvailableVo getJobHighAvailableVo(long jobId){
StreamJobVersion jobVersion = this.defaultStreamJobService.getLatestJobVersion(jobId);
String highAvailablePolicy = this.streamJobConfService.getJobConfValue(jobId, JobConf.HIGHAVAILABLE_POLICY().key());
String highAvailablePolicy = this.streamJobConfService.getJobConfValue(jobId, JobConf.HIGHAVAILABLE_POLICY_KEY().getValue());
JobHighAvailableVo inspectVo = new JobHighAvailableVo();
Optional<String> sourceOption = Optional.ofNullable(jobVersion.getSource());
if(sourceOption.isPresent() && JsonUtil.isJson(sourceOption.get())) {
Expand Down

0 comments on commit 035bb56

Please sign in to comment.