Skip to content

Commit

Permalink
[Feature-13331][Remote Logging] Add support for writing task logs to OSS
Browse files Browse the repository at this point in the history
  • Loading branch information
rickchengx committed Jan 4, 2023
1 parent ccad56e commit 952fff4
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,21 @@ private Constants() {
public static final Integer QUERY_ALL_ON_PROJECT = 1;
public static final Integer QUERY_ALL_ON_WORKFLOW = 2;
public static final Integer QUERY_ALL_ON_TASK = 3;

/**
* remote logging
*/
public static final String REMOTE_LOGGING_ENABLE = "remote.logging.enable";

public static final String REMOTE_LOGGING_TARGET = "remote.logging.target";

public static final String REMOTE_LOGGING_OSS_ACCESS_KEY_ID = "remote.logging.oss.access.key.id";

public static final String REMOTE_LOGGING_OSS_ACCESS_KEY_SECRET = "remote.logging.oss.access.key.secret";

public static final String REMOTE_LOGGING_OSS_BUCKET_NAME = "remote.logging.oss.bucket.name";

public static final String REMOTE_LOGGING_OSS_ENDPOINT = "remote.logging.oss.endpoint";

public static final String REMOTE_LOGGING_OSS_BASE_DIR = "remote.logging.oss.base.dir";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.factory.OssClientFactory;
import org.apache.dolphinscheduler.common.model.OssConnection;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aliyun.oss.OSS;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.PutObjectRequest;

public class OssRemoteLogHandler implements RemoteLogHandler {

private static final Logger logger = LoggerFactory.getLogger(OssRemoteLogHandler.class);
@Override
public void sendRemoteLog(String logPath) {
String accessKeyId = readOssAccessKeyId();
String accessKeySecret = readOssAccessKeySecret();
String endpoint = readOssEndpoint();
OSS ossClient = OssClientFactory.buildOssClient(new OssConnection(accessKeyId, accessKeySecret, endpoint));

String bucketName = readOssBucketName();
String objectName = Paths.get(readOssBaseDir(), getObjectNameFromLogPath(logPath)).toString();

try {
logger.info("send remote log {} to OSS {}", logPath, objectName);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, new File(logPath));
ossClient.putObject(putObjectRequest);
} catch (Exception e) {
logger.error("error while sending remote log {} to OSS {}", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String accessKeyId = readOssAccessKeyId();
String accessKeySecret = readOssAccessKeySecret();
String endpoint = readOssEndpoint();
OSS ossClient = OssClientFactory.buildOssClient(new OssConnection(accessKeyId, accessKeySecret, endpoint));

String bucketName = readOssBucketName();
String objectName = Paths.get(readOssBaseDir(), getObjectNameFromLogPath(logPath)).toString();

try {
logger.info("get remote log on OSS {} to {}", objectName, logPath);
ossClient.getObject(new GetObjectRequest(bucketName, objectName), new File(logPath));
} catch (Exception e) {
logger.error("error while getting remote log on OSS {} to {}", objectName, logPath);
}
}

private String getObjectNameFromLogPath(String logPath) {
Path path = Paths.get(logPath);
int nameCount = path.getNameCount();
if (nameCount < 2) {
return logPath;
} else {
return path.subpath(nameCount - 2, nameCount).toString();
}
}

private String readOssAccessKeyId() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_ACCESS_KEY_ID);
}

private String readOssAccessKeySecret() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_ACCESS_KEY_SECRET);
}

private String readOssEndpoint() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_ENDPOINT);
}

private String readOssBucketName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME);
}

private String readOssBaseDir() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_BASE_DIR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

public interface RemoteLogHandler {

void sendRemoteLog(String logPath);

void getRemoteLog(String logPath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import lombok.experimental.UtilityClass;

@UtilityClass
public class RemoteLogHandlerFactory {

public RemoteLogHandler getRemoteLogHandler() {
if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) {
if ("OSS".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET))) {
return new OssRemoteLogHandler();
} else {
return null;
}
} else {
return null;
}
}
}
17 changes: 16 additions & 1 deletion dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,19 @@ ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
ml.mlflow.preset_repository_version="main"

# way to collect applicationId: log(original regex match), aop
appId.collect: log
appId.collect: log

# remote logging
remote.logging.enable=false
# if remote.logging.enable = true, set the target of remote logging
remote.logging.target=OSS
# oss access key id, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.id=<access.key.id>
# oss access key secret, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.secret=<access.key.secret>
# oss bucket name, required if you set remote.logging.target=OSS
remote.logging.oss.bucket.name=<bucket.name>
# oss endpoint, required if you set remote.logging.target=OSS
remote.logging.oss.endpoint=<endpoint>
# oss base directory, required if you set remote.logging.target=OSS
remote.logging.oss.base.dir=logs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogHandler;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogHandlerFactory;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
Expand Down Expand Up @@ -462,6 +465,8 @@ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleExcep
taskInstance.getTaskCode(),
taskInstance.getState());
this.updateProcessInstanceState();

sendRemoteLogIfNeeded(taskInstance);
} catch (Exception ex) {
logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
// remove the task from complete map, so that we can finish in the next time.
Expand Down Expand Up @@ -2179,4 +2184,22 @@ private enum WorkflowRunnableStatus {

}

private void sendRemoteLogIfNeeded(TaskInstance taskInstance) {
if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) {
if (taskInstance.getHost().endsWith(masterAddress.split(":")[1])) {
try {
logger.info("Start to send master's log {} to remote target {}", taskInstance.getLogPath(),
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));

RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
remoteLogHandler.sendRemoteLog(taskInstance.getLogPath());

logger.info("Succeed to send master's log {} to remote target {}", taskInstance.getLogPath(),
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
} catch (Exception e) {
logger.error("send master's log {} to remote target error", taskInstance.getLogPath(), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogHandler;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogHandlerFactory;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
Expand Down Expand Up @@ -82,7 +85,7 @@ public void process(Channel channel, Command command) {
ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
command.getBody(), ViewLogRequestCommand.class);
String viewLogPath = viewLogRequest.getPath();
String msg = LogUtils.readWholeFileContent(viewLogPath);
String msg = readWholeFileContent(viewLogPath);
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
Expand Down Expand Up @@ -157,7 +160,7 @@ public void process(Channel channel, Command command) {
* @param filePath file path
* @return byte array of file
*/
private byte[] getFileContentBytes(String filePath) {
private byte[] getFileContentBytesFromLocal(String filePath) {
try (
InputStream in = new FileInputStream(filePath);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
Expand All @@ -173,6 +176,30 @@ private byte[] getFileContentBytes(String filePath) {
return new byte[0];
}

private byte[] getFileContentBytesFromRemote(String filePath) {
RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler != null) {
remoteLogHandler.getRemoteLog(filePath);
} else {
logger.error("remote log handler is null");
}

return getFileContentBytesFromLocal(filePath);
}

private byte[] getFileContentBytes(String filePath) {
File file = new File(filePath);
if (file.exists()) {
return getFileContentBytesFromLocal(filePath);
} else {
if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) {
return getFileContentBytesFromRemote(filePath);
} else {
return getFileContentBytesFromLocal(filePath);
}
}
}

/**
* read part file content,can skip any line and read some lines
*
Expand All @@ -181,9 +208,9 @@ private byte[] getFileContentBytes(String filePath) {
* @param limit read lines limit
* @return part file content
*/
private List<String> readPartFileContent(String filePath,
int skipLine,
int limit) {
private List<String> readPartFileContentFromLocal(String filePath,
int skipLine,
int limit) {
File file = new File(filePath);
if (file.exists() && file.isFile()) {
try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
Expand All @@ -197,4 +224,56 @@ private List<String> readPartFileContent(String filePath,
return Collections.emptyList();
}

private List<String> readPartFileContentFromRemote(String filePath,
int skipLine,
int limit) {
RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler != null) {
remoteLogHandler.getRemoteLog(filePath);
} else {
logger.error("remote log handler is null");
}

return readPartFileContentFromLocal(filePath, skipLine, limit);
}

private List<String> readPartFileContent(String filePath,
int skipLine,
int limit) {
File file = new File(filePath);
if (file.exists()) {
return readPartFileContentFromLocal(filePath, skipLine, limit);
} else {
if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) {
return readPartFileContentFromRemote(filePath, skipLine, limit);
} else {
return readPartFileContentFromLocal(filePath, skipLine, limit);
}
}
}

private String readWholeFileContentFromRemote(String filePath) {
RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler != null) {
remoteLogHandler.getRemoteLog(filePath);
} else {
logger.error("remote log handler is null");
}

return LogUtils.readWholeFileContentFromLocal(filePath);
}

private String readWholeFileContent(String filePath) {
File file = new File(filePath);
if (file.exists()) {
return LogUtils.readWholeFileContentFromLocal(filePath);
} else {
if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) {
return readWholeFileContentFromRemote(filePath);
} else {
return LogUtils.readWholeFileContentFromLocal(filePath);
}
}
}

}
Loading

0 comments on commit 952fff4

Please sign in to comment.