Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: 脚本任务执行结果拉取优化 #2919 #2997

Merged
merged 12 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.tencent.bk.job.common.util.date;

import com.tencent.bk.job.common.util.json.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.helpers.MessageFormatter;
Expand All @@ -42,12 +41,9 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Formatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public GseV2ApiClient(MeterRegistry meterRegistry,
HttpHelperFactory.createHttpHelper(
15000,
15000,
15000,
60000,
1000,
2000,
60,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.bk.job.common.gse.v2.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -48,6 +49,7 @@ public static class AgentTask {
}

@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class AtomicTask {
/**
* id 编号,在当前任务里面唯一,需要取大于等于0的值
Expand All @@ -59,15 +61,34 @@ public static class AtomicTask {
* 执行日志读取偏移量,单位byte
*/
private int offset;

/**
* 执行日志读取大小上限,单位byte
*/
private Integer limit;
}

public void addAgentTaskQuery(ExecuteObjectGseKey executeObjectGseKey, Integer atomicTaskId, int offset) {
/**
* 新增 Agent 查询条件
*
* @param executeObjectGseKey 执行对象 GSE KEY
* @param atomicTaskId id 编号,在当前任务里面唯一,需要取大于等于0的值
* @param offset 执行日志读取偏移量,单位byte
* @param limit 执行日志读取大小上限,单位byte;传入 null 无效,表示不限制
*/
public void addAgentTaskQuery(ExecuteObjectGseKey executeObjectGseKey,
Integer atomicTaskId,
int offset,
Integer limit) {
AgentTask agentTask = new AgentTask();
agentTask.setAgentId(executeObjectGseKey.getAgentId());
agentTask.setContainerId(executeObjectGseKey.getContainerId());
AtomicTask atomicTask = new AtomicTask();
atomicTask.setAtomicTaskId(atomicTaskId);
atomicTask.setOffset(offset);
if (limit != null && limit > 0) {
atomicTask.setLimit(limit);
}
agentTask.setAtomicTasks(Collections.singletonList(atomicTask));
agentTasks.add(agentTask);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.tencent.bk.job.backup.dao.impl;

import com.tencent.bk.job.execute.model.tables.GseFileAgentTask;
import com.tencent.bk.job.execute.model.tables.GseScriptAgentTask;
import com.tencent.bk.job.execute.model.tables.records.GseScriptAgentTaskRecord;
import org.jooq.DSLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,11 @@ public class JobExecuteConfig {

@Value("${gse.script.rootPath:/tmp/bkjob}")
private String gseScriptFileRootPath;

/**
* GSE 脚本任务执行结果查询 API 单次返回的执行输出内容长度(单位 byte)
* 默认值:512M
*/
@Value("${job.execute.scriptTask.query.contentSizeLimit:536870912}")
private int scriptTaskQueryContentSizeLimit;
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ protected void addResultHandleTask() {
fileExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ protected final void addResultHandleTask() {
scriptExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.tencent.bk.job.common.util.FilePathUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.config.FileDistributeConfig;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor;
import com.tencent.bk.job.execute.engine.listener.event.ResultHandleTaskResumeEvent;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher;
Expand Down Expand Up @@ -97,6 +98,8 @@ public class ResultHandleResumeListener {
private final StepInstanceService stepInstanceService;
private final GseClient gseClient;

private final JobExecuteConfig jobExecuteConfig;

@Autowired
public ResultHandleResumeListener(TaskInstanceService taskInstanceService,
ResultHandleManager resultHandleManager,
Expand All @@ -111,7 +114,8 @@ public ResultHandleResumeListener(TaskInstanceService taskInstanceService,
ScriptExecuteObjectTaskService scriptExecuteObjectTaskService,
FileExecuteObjectTaskService fileExecuteObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient) {
GseClient gseClient,
JobExecuteConfig jobExecuteConfig) {
this.taskInstanceService = taskInstanceService;
this.resultHandleManager = resultHandleManager;
this.taskInstanceVariableService = taskInstanceVariableService;
Expand All @@ -126,6 +130,7 @@ public ResultHandleResumeListener(TaskInstanceService taskInstanceService,
this.fileExecuteObjectTaskService = fileExecuteObjectTaskService;
this.stepInstanceService = stepInstanceService;
this.gseClient = gseClient;
this.jobExecuteConfig = jobExecuteConfig;
}


Expand Down Expand Up @@ -191,6 +196,7 @@ private void resumeScriptTask(TaskInstanceDTO taskInstance,
scriptExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down Expand Up @@ -240,6 +246,7 @@ private void resumeFileTask(TaskInstanceDTO taskInstance,
fileExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@
@Data
@NoArgsConstructor
public class GseLogBatchPullResult<T> {
/**
* 是否成功拉取到日志
*/
private boolean success;
/**
* 错误信息
*/
private String errorMsg;
/**
* 是否最后一批
*/
Expand All @@ -50,10 +42,8 @@ public class GseLogBatchPullResult<T> {
*/
private GseTaskResult<T> gseTaskResult;

public GseLogBatchPullResult(boolean success, boolean isLastBatch, GseTaskResult<T> gseTaskResult, String errorMsg) {
this.success = success;
public GseLogBatchPullResult(boolean isLastBatch, GseTaskResult<T> gseTaskResult) {
this.lastBatch = isLastBatch;
this.gseTaskResult = gseTaskResult;
this.errorMsg = errorMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.tencent.bk.job.common.util.date.DateUtils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum;
import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor;
import com.tencent.bk.job.execute.engine.listener.event.EventSource;
Expand Down Expand Up @@ -210,6 +211,7 @@ public abstract class AbstractResultHandleTask<T> implements ContinuousScheduled
*/
protected String gseTaskInfo;

protected JobExecuteConfig jobExecuteConfig;

protected AbstractResultHandleTask(TaskInstanceService taskInstanceService,
GseTaskService gseTaskService,
Expand All @@ -222,6 +224,7 @@ protected AbstractResultHandleTask(TaskInstanceService taskInstanceService,
ExecuteObjectTaskService executeObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient,
JobExecuteConfig jobExecuteConfig,
TaskInstanceDTO taskInstance,
StepInstanceDTO stepInstance,
TaskVariablesAnalyzeResult taskVariablesAnalyzeResult,
Expand All @@ -240,6 +243,7 @@ protected AbstractResultHandleTask(TaskInstanceService taskInstanceService,
this.executeObjectTaskService = executeObjectTaskService;
this.stepInstanceService = stepInstanceService;
this.gseClient = gseClient;
this.jobExecuteConfig = jobExecuteConfig;
this.requestId = requestId;
this.taskInstance = taskInstance;
this.taskInstanceId = taskInstance.getId();
Expand Down Expand Up @@ -317,11 +321,6 @@ private boolean pullGSEResultAndAnalyse(StopWatch watch) {
// 分批拉取GSE任务执行结果
gseLogBatchPullResult = pullGseTaskResultInBatches();

// 拉取结果校验
if (!checkPullResult(gseLogBatchPullResult)) {
return false;
}

// 检查任务异常并处理
GseTaskResult<T> gseTaskResult = gseLogBatchPullResult.getGseTaskResult();
if (determineTaskAbnormal(gseTaskResult)) {
Expand Down Expand Up @@ -503,20 +502,6 @@ private boolean checkEmptyGseResult(GseTaskResult<?> gseTaskResult) {
return isAbnormal;
}

private boolean checkPullResult(GseLogBatchPullResult<T> gseLogBatchPullResult) {
if (!gseLogBatchPullResult.isSuccess()) {
log.error("[{}] Pull gse task result error, errorMsg: {}", gseTaskInfo,
gseLogBatchPullResult.getErrorMsg());
this.executeResult = GseTaskExecuteResult.FAILED;
saveFailInfoForUnfinishedExecuteObjectTask(ExecuteObjectTaskStatusEnum.LOG_ERROR,
gseLogBatchPullResult.getErrorMsg());
finishGseTask(GseTaskExecuteResult.FAILED, true);
return false;
}
return true;
}


/**
* 设置目标gent任务结束状态
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.tencent.bk.job.common.util.ip.IpUtils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.common.constants.FileDistStatusEnum;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum;
import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher;
Expand Down Expand Up @@ -175,6 +176,7 @@ public FileResultHandleTask(TaskInstanceService taskInstanceService,
FileExecuteObjectTaskService fileExecuteObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient,
JobExecuteConfig jobExecuteConfig,
TaskInstanceDTO taskInstance,
StepInstanceDTO stepInstance,
TaskVariablesAnalyzeResult taskVariablesAnalyzeResult,
Expand All @@ -195,6 +197,7 @@ public FileResultHandleTask(TaskInstanceService taskInstanceService,
fileExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down Expand Up @@ -267,10 +270,9 @@ GseLogBatchPullResult<FileTaskResult> pullGseTaskResultInBatches() {
FileTaskResult result = gseClient.getTransferFileResult(request);
GseLogBatchPullResult<FileTaskResult> pullResult;
if (result != null) {
pullResult = new GseLogBatchPullResult<>(
true, true, new FileGseTaskResult(result), null);
pullResult = new GseLogBatchPullResult<>(true, new FileGseTaskResult(result));
} else {
pullResult = new GseLogBatchPullResult<>(true, true, null, null);
pullResult = new GseLogBatchPullResult<>(true, null);
}
return pullResult;
}
Expand Down
Loading
Loading