Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
rickchengx committed Feb 10, 2023
1 parent 475906d commit b70bed3
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@
import java.nio.file.Path;
import java.nio.file.Paths;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

import com.aliyun.oss.OSS;
import com.aliyun.oss.model.Bucket;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.PutObjectRequest;

@Slf4j
public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {

private static final Logger logger = LoggerFactory.getLogger(OssRemoteLogHandler.class);

private static final int OBJECT_NAME_COUNT = 2;

private OSS ossClient;
Expand All @@ -66,23 +64,24 @@ public void sendRemoteLog(String logPath) {
String objectName = getObjectNameFromLogPath(logPath);

try {
logger.info("send remote log {} to OSS {}", logPath, objectName);
log.info("send remote log {} to OSS {}", logPath, objectName);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, new File(logPath));
ossClient.putObject(putObjectRequest);
} catch (Exception e) {
logger.error("error while sending remote log {} to OSS {}", logPath, objectName, e);
log.error("error while sending remote log {} to OSS {}", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String objectName = getObjectNameFromLogPath(logPath);
mkdirOfLog(logPath);

try {
logger.info("get remote log on OSS {} to {}", objectName, logPath);
log.info("get remote log on OSS {} to {}", objectName, logPath);
ossClient.getObject(new GetObjectRequest(bucketName, objectName), new File(logPath));
} catch (Exception e) {
logger.error("error while getting remote log on OSS {} to {}", objectName, logPath, e);
log.error("error while getting remote log on OSS {} to {}", objectName, logPath, e);
}
}

Expand All @@ -105,6 +104,11 @@ private String getObjectNameFromLogPath(String logPath) {
}
}

private void mkdirOfLog(String logPath) {
Path directory = Paths.get(logPath).getParent();
directory.toFile().mkdirs();
}

private void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME + " is empty");
Expand All @@ -120,7 +124,7 @@ private void checkBucketNameExists(String bucketName) {
"bucketName: " + bucketName + " does not exist, you need to create them by yourself");
});

logger.info("bucketName: {} has been found", existsBucket.getName());
log.info("bucketName: {} has been found", existsBucket.getName());
}

private String readOssAccessKeyId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
public class RemoteLogHandlerFactory {

public RemoteLogHandler getRemoteLogHandler() {
if (RemoteLogUtils.isRemoteLoggingEnable()) {
if ("OSS".equals(PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET))) {
OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler();
ossRemoteLogHandler.init();
return ossRemoteLogHandler;
} else {
return null;
}
} else {
if (!RemoteLogUtils.isRemoteLoggingEnable()) {
return null;
}
if (!"OSS".equals(PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET))) {
return null;
}
OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler();
ossRemoteLogHandler.init();
return ossRemoteLogHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,28 @@
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RemoteLogService {

private static final Logger logger = LoggerFactory.getLogger(RemoteLogService.class);

@Async("remoteLogHandleExecutor")
public void asyncSendRemoteLog(String logPath) {
if (RemoteLogUtils.isRemoteLoggingEnable()) {
logger.info("Start to send log {} to remote target {}", logPath,
log.info("Start to send log {} to remote target {}", logPath,
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));

RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler != null) {
remoteLogHandler.sendRemoteLog(logPath);
} else {
logger.error("remote log handler is null");
if (remoteLogHandler == null) {
log.error("remote log handler is null");
return;
}

logger.info("Succeed to send log {} to remote target {}", logPath,
remoteLogHandler.sendRemoteLog(logPath);
log.info("Succeed to send log {} to remote target {}", logPath,
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RemoteLogUtils {

private static final Logger logger = LoggerFactory.getLogger(RemoteLogUtils.class);

private static RemoteLogService remoteLogService;

@Autowired
Expand All @@ -51,17 +50,16 @@ public static void sendRemoteLog(String logPath) {

public static void getRemoteLog(String logPath) {
if (isRemoteLoggingEnable()) {
logger.info("Start to get log {} from remote target {}", logPath,
log.info("Start to get log {} from remote target {}", logPath,
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));

RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler != null) {
remoteLogHandler.getRemoteLog(logPath);
} else {
logger.error("remote log handler is null");
if (remoteLogHandler == null) {
log.error("remote log handler is null");
return;
}

logger.info("Succeed to get log {} from remote target {}", logPath,
remoteLogHandler.getRemoteLog(logPath);
log.info("End get log {} from remote target {}", logPath,
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,11 @@ private byte[] getFileContentBytes(String filePath) {
File file = new File(filePath);
if (file.exists()) {
return getFileContentBytesFromLocal(filePath);
} else {
if (RemoteLogUtils.isRemoteLoggingEnable()) {
return getFileContentBytesFromRemote(filePath);
} else {
return getFileContentBytesFromLocal(filePath);
}
}
if (RemoteLogUtils.isRemoteLoggingEnable()) {
return getFileContentBytesFromRemote(filePath);
}
return getFileContentBytesFromLocal(filePath);
}

/**
Expand Down Expand Up @@ -228,13 +226,11 @@ private List<String> readPartFileContent(String filePath,
File file = new File(filePath);
if (file.exists()) {
return readPartFileContentFromLocal(filePath, skipLine, limit);
} else {
if (RemoteLogUtils.isRemoteLoggingEnable()) {
return readPartFileContentFromRemote(filePath, skipLine, limit);
} else {
return readPartFileContentFromLocal(filePath, skipLine, limit);
}
}
if (RemoteLogUtils.isRemoteLoggingEnable()) {
return readPartFileContentFromRemote(filePath, skipLine, limit);
}
return readPartFileContentFromLocal(filePath, skipLine, limit);
}

private String readWholeFileContentFromRemote(String filePath) {
Expand All @@ -246,13 +242,11 @@ private String readWholeFileContent(String filePath) {
File file = new File(filePath);
if (file.exists()) {
return LogUtils.readWholeFileContentFromLocal(filePath);
} else {
if (RemoteLogUtils.isRemoteLoggingEnable()) {
return readWholeFileContentFromRemote(filePath);
} else {
return LogUtils.readWholeFileContentFromLocal(filePath);
}
}
if (RemoteLogUtils.isRemoteLoggingEnable()) {
return readWholeFileContentFromRemote(filePath);
}
return LogUtils.readWholeFileContentFromLocal(filePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
this.logBuffer = new LinkedBlockingQueue<>();

if (this.taskRequest != null) {
this.taskRequest.setLogHandleEnable(true);
// set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
this.taskRequest.setLogBufferEnable(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,5 @@ public class TaskExecutionContext implements Serializable {
*/
private int testFlag;

private boolean logHandleEnable;
private boolean logBufferEnable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ protected void sendTaskResult() {
}

protected void sendTaskLogOnWorkerToRemoteIfNeeded() {
if (taskExecutionContext.isLogHandleEnable()) {
if (taskExecutionContext.isLogBufferEnable()) {
return;
}

Expand Down

0 comments on commit b70bed3

Please sign in to comment.