Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: 脚本任务执行结果拉取优化 #2919 #2997

Merged
merged 12 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void checkVolumeAndClear() {
log.info("maxSizeStr is blank, ignore checkVolumeAndClear");
return;
}
Long maxSizeBytes = FileSizeUtil.parseFileSizeBytes(maxSizeStr);
if (maxSizeBytes == null || maxSizeBytes <= 0) {
long maxSizeBytes = FileSizeUtil.parseFileSizeBytes(maxSizeStr);
if (maxSizeBytes <= 0) {
log.error("Cannot parse valid maxSizeBytes from maxSizeStr {}, ignore checkVolumeAndClear", maxSizeStr);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.tencent.bk.job.common.util.date;

import com.tencent.bk.job.common.util.json.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.helpers.MessageFormatter;
Expand All @@ -42,12 +41,9 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Formatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.helpers.MessageFormatter;

@Slf4j
public class FileSizeUtil {
Expand Down Expand Up @@ -55,9 +54,9 @@ public static String getFileSizeStr(Long byteNum) {
* @param fileSizeStr 带单位的文件大小描述
* @return 真实表示的字节数
*/
public static Long parseFileSizeBytes(String fileSizeStr) {
public static long parseFileSizeBytes(String fileSizeStr) {
if (StringUtils.isBlank(fileSizeStr)) {
return null;
throw new IllegalArgumentException("Invalid fileSize : " + fileSizeStr);
}
long factor = 1L;
fileSizeStr = fileSizeStr.trim().toUpperCase();
Expand All @@ -84,15 +83,6 @@ public static Long parseFileSizeBytes(String fileSizeStr) {
fileSizeStr = fileSizeStr.replace("K", "");
factor = 1024L;
}
try {
return Math.round(Double.parseDouble(fileSizeStr) * factor);
} catch (Exception e) {
String msg = MessageFormatter.format(
"Fail to parse double from fileSizeStr:{}",
fileSizeStr
).getMessage();
log.warn(msg, e);
return null;
}
return Math.round(Double.parseDouble(fileSizeStr) * factor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

public class FileSizeUtilTest {
Expand All @@ -45,16 +46,15 @@ void testGetFileSizeStr() {

@Test
void testParseFileSizeBytes() {
assertThat(FileSizeUtil.parseFileSizeBytes(null)).isNull();
assertThat(FileSizeUtil.parseFileSizeBytes("")).isNull();
assertThat(FileSizeUtil.parseFileSizeBytes(" ")).isNull();
assertThat(FileSizeUtil.parseFileSizeBytes("1B")).isEqualTo(1L);
assertThat(FileSizeUtil.parseFileSizeBytes("1KB")).isEqualTo(1024L);
assertThat(FileSizeUtil.parseFileSizeBytes("1.5KB")).isEqualTo(1536L);
assertThat(FileSizeUtil.parseFileSizeBytes("1MB")).isEqualTo(1024L * 1024L);
assertThat(FileSizeUtil.parseFileSizeBytes("1GB")).isEqualTo(1024L * 1024L * 1024L);
assertThat(FileSizeUtil.parseFileSizeBytes("1TB")).isEqualTo(1024L * 1024L * 1024L * 1024L);
assertThat(FileSizeUtil.parseFileSizeBytes("1PB")).isEqualTo(1024L * 1024L * 1024L * 1024L * 1024L);
assertThat(FileSizeUtil.parseFileSizeBytes("1XB")).isNull();
assertThatThrownBy(() ->
FileSizeUtil.parseFileSizeBytes("1XB")
).isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public GseV2ApiClient(MeterRegistry meterRegistry,
HttpHelperFactory.createHttpHelper(
15000,
15000,
15000,
60000,
1000,
2000,
60,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.bk.job.common.gse.v2.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -48,6 +49,7 @@ public static class AgentTask {
}

@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class AtomicTask {
/**
* id 编号,在当前任务里面唯一,需要取大于等于0的值
Expand All @@ -59,15 +61,34 @@ public static class AtomicTask {
* 执行日志读取偏移量,单位byte
*/
private int offset;

/**
* 执行日志读取大小上限,单位byte
*/
private Long limit;
}

public void addAgentTaskQuery(ExecuteObjectGseKey executeObjectGseKey, Integer atomicTaskId, int offset) {
/**
* 新增 Agent 查询条件
*
* @param executeObjectGseKey 执行对象 GSE KEY
* @param atomicTaskId id 编号,在当前任务里面唯一,需要取大于等于0的值
* @param offset 执行日志读取偏移量,单位byte
* @param limit 执行日志读取大小上限,单位byte;传入 null 无效,表示不限制
*/
public void addAgentTaskQuery(ExecuteObjectGseKey executeObjectGseKey,
Integer atomicTaskId,
int offset,
Long limit) {
AgentTask agentTask = new AgentTask();
agentTask.setAgentId(executeObjectGseKey.getAgentId());
agentTask.setContainerId(executeObjectGseKey.getContainerId());
AtomicTask atomicTask = new AtomicTask();
atomicTask.setAtomicTaskId(atomicTaskId);
atomicTask.setOffset(offset);
if (limit != null && limit > 0) {
atomicTask.setLimit(limit);
}
agentTask.setAtomicTasks(Collections.singletonList(atomicTask));
agentTasks.add(agentTask);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.tencent.bk.job.backup.dao.impl;

import com.tencent.bk.job.execute.model.tables.GseFileAgentTask;
import com.tencent.bk.job.execute.model.tables.GseScriptAgentTask;
import com.tencent.bk.job.execute.model.tables.records.GseScriptAgentTaskRecord;
import org.jooq.DSLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,11 @@ public class JobExecuteConfig {

@Value("${gse.script.rootPath:/tmp/bkjob}")
private String gseScriptFileRootPath;

/**
* GSE 脚本任务执行结果查询 API 单次返回的执行输出内容长度
* 默认值:512M
*/
@Value("${job.execute.scriptTask.query.contentSizeLimit:512MB}")
private String scriptTaskQueryContentSizeLimit;
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ protected void addResultHandleTask() {
fileExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ protected final void addResultHandleTask() {
scriptExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.tencent.bk.job.common.util.FilePathUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.config.FileDistributeConfig;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor;
import com.tencent.bk.job.execute.engine.listener.event.ResultHandleTaskResumeEvent;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher;
Expand Down Expand Up @@ -97,6 +98,8 @@ public class ResultHandleResumeListener {
private final StepInstanceService stepInstanceService;
private final GseClient gseClient;

private final JobExecuteConfig jobExecuteConfig;

@Autowired
public ResultHandleResumeListener(TaskInstanceService taskInstanceService,
ResultHandleManager resultHandleManager,
Expand All @@ -111,7 +114,8 @@ public ResultHandleResumeListener(TaskInstanceService taskInstanceService,
ScriptExecuteObjectTaskService scriptExecuteObjectTaskService,
FileExecuteObjectTaskService fileExecuteObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient) {
GseClient gseClient,
JobExecuteConfig jobExecuteConfig) {
this.taskInstanceService = taskInstanceService;
this.resultHandleManager = resultHandleManager;
this.taskInstanceVariableService = taskInstanceVariableService;
Expand All @@ -126,6 +130,7 @@ public ResultHandleResumeListener(TaskInstanceService taskInstanceService,
this.fileExecuteObjectTaskService = fileExecuteObjectTaskService;
this.stepInstanceService = stepInstanceService;
this.gseClient = gseClient;
this.jobExecuteConfig = jobExecuteConfig;
}


Expand Down Expand Up @@ -191,6 +196,7 @@ private void resumeScriptTask(TaskInstanceDTO taskInstance,
scriptExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down Expand Up @@ -240,6 +246,7 @@ private void resumeFileTask(TaskInstanceDTO taskInstance,
fileExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@
@Data
@NoArgsConstructor
public class GseLogBatchPullResult<T> {
/**
* 是否成功拉取到日志
*/
private boolean success;
/**
* 错误信息
*/
private String errorMsg;
/**
* 是否最后一批
*/
Expand All @@ -50,10 +42,8 @@ public class GseLogBatchPullResult<T> {
*/
private GseTaskResult<T> gseTaskResult;

public GseLogBatchPullResult(boolean success, boolean isLastBatch, GseTaskResult<T> gseTaskResult, String errorMsg) {
this.success = success;
public GseLogBatchPullResult(boolean isLastBatch, GseTaskResult<T> gseTaskResult) {
this.lastBatch = isLastBatch;
this.gseTaskResult = gseTaskResult;
this.errorMsg = errorMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.tencent.bk.job.common.util.date.DateUtils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum;
import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor;
import com.tencent.bk.job.execute.engine.listener.event.EventSource;
Expand Down Expand Up @@ -210,6 +211,7 @@ public abstract class AbstractResultHandleTask<T> implements ContinuousScheduled
*/
protected String gseTaskInfo;

protected JobExecuteConfig jobExecuteConfig;

protected AbstractResultHandleTask(TaskInstanceService taskInstanceService,
GseTaskService gseTaskService,
Expand All @@ -222,6 +224,7 @@ protected AbstractResultHandleTask(TaskInstanceService taskInstanceService,
ExecuteObjectTaskService executeObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient,
JobExecuteConfig jobExecuteConfig,
TaskInstanceDTO taskInstance,
StepInstanceDTO stepInstance,
TaskVariablesAnalyzeResult taskVariablesAnalyzeResult,
Expand All @@ -240,6 +243,7 @@ protected AbstractResultHandleTask(TaskInstanceService taskInstanceService,
this.executeObjectTaskService = executeObjectTaskService;
this.stepInstanceService = stepInstanceService;
this.gseClient = gseClient;
this.jobExecuteConfig = jobExecuteConfig;
this.requestId = requestId;
this.taskInstance = taskInstance;
this.taskInstanceId = taskInstance.getId();
Expand Down Expand Up @@ -317,11 +321,6 @@ private boolean pullGSEResultAndAnalyse(StopWatch watch) {
// 分批拉取GSE任务执行结果
gseLogBatchPullResult = pullGseTaskResultInBatches();

// 拉取结果校验
if (!checkPullResult(gseLogBatchPullResult)) {
return false;
}

// 检查任务异常并处理
GseTaskResult<T> gseTaskResult = gseLogBatchPullResult.getGseTaskResult();
if (determineTaskAbnormal(gseTaskResult)) {
Expand Down Expand Up @@ -503,20 +502,6 @@ private boolean checkEmptyGseResult(GseTaskResult<?> gseTaskResult) {
return isAbnormal;
}

private boolean checkPullResult(GseLogBatchPullResult<T> gseLogBatchPullResult) {
if (!gseLogBatchPullResult.isSuccess()) {
log.error("[{}] Pull gse task result error, errorMsg: {}", gseTaskInfo,
gseLogBatchPullResult.getErrorMsg());
this.executeResult = GseTaskExecuteResult.FAILED;
saveFailInfoForUnfinishedExecuteObjectTask(ExecuteObjectTaskStatusEnum.LOG_ERROR,
gseLogBatchPullResult.getErrorMsg());
finishGseTask(GseTaskExecuteResult.FAILED, true);
return false;
}
return true;
}


/**
* 设置目标gent任务结束状态
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.tencent.bk.job.common.util.ip.IpUtils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.common.constants.FileDistStatusEnum;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum;
import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher;
Expand Down Expand Up @@ -175,6 +176,7 @@ public FileResultHandleTask(TaskInstanceService taskInstanceService,
FileExecuteObjectTaskService fileExecuteObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient,
JobExecuteConfig jobExecuteConfig,
TaskInstanceDTO taskInstance,
StepInstanceDTO stepInstance,
TaskVariablesAnalyzeResult taskVariablesAnalyzeResult,
Expand All @@ -195,6 +197,7 @@ public FileResultHandleTask(TaskInstanceService taskInstanceService,
fileExecuteObjectTaskService,
stepInstanceService,
gseClient,
jobExecuteConfig,
taskInstance,
stepInstance,
taskVariablesAnalyzeResult,
Expand Down Expand Up @@ -267,10 +270,9 @@ GseLogBatchPullResult<FileTaskResult> pullGseTaskResultInBatches() {
FileTaskResult result = gseClient.getTransferFileResult(request);
GseLogBatchPullResult<FileTaskResult> pullResult;
if (result != null) {
pullResult = new GseLogBatchPullResult<>(
true, true, new FileGseTaskResult(result), null);
pullResult = new GseLogBatchPullResult<>(true, new FileGseTaskResult(result));
} else {
pullResult = new GseLogBatchPullResult<>(true, true, null, null);
pullResult = new GseLogBatchPullResult<>(true, null);
}
return pullResult;
}
Expand Down
Loading
Loading