Skip to content

Commit

Permalink
[Feature-13429][Remote Logging] Add support for writing task logs to …
Browse files Browse the repository at this point in the history
…Google Cloud Storage
  • Loading branch information
rickchengx committed Mar 22, 2023
1 parent 69e7449 commit edbcc17
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 15 deletions.
11 changes: 11 additions & 0 deletions docs/docs/en/guide/remote-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ remote.logging.s3.endpoint=<endpoint>
remote.logging.s3.region=<region>
```

## Writing task logs to [Google Cloud Storage (GCS)](https://cloud.google.com/storage)

Configure `common.properties` as follows:

```properties
# the location of the google cloud credential, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>
```

13 changes: 12 additions & 1 deletion docs/docs/zh/guide/remote-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Apache DolphinScheduler支持将任务日志传输到远端存储上。当配置
```properties
# 是否开启远程日志存储
remote.logging.enable=true
# 任务日志写入的远端存储,目前支持OSS, S3
# 任务日志写入的远端存储,目前支持OSS, S3, GCS
remote.logging.target=OSS
# 任务日志在远端存储上的目录
remote.logging.base.dir=logs
Expand Down Expand Up @@ -50,3 +50,14 @@ remote.logging.s3.endpoint=<endpoint>
remote.logging.s3.region=<region>
```

## 将任务日志写入[Google Cloud Storage (GCS)](https://cloud.google.com/storage)

配置`common.propertis`如下:

```properties
# the location of the google cloud credential, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>
```

1 change: 1 addition & 0 deletions dolphinscheduler-api/src/main/resources/logback-spring.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

<configuration scan="true" scanPeriod="120 seconds">
<property name="log.base" value="logs"/>
<property scope="context" name="log.base.ctx" value="${log.base}" />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
Expand Down
27 changes: 27 additions & 0 deletions dolphinscheduler-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,32 @@
<version>6.1.26</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<exclusions>
<exclusion>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.perfmark</groupId>
<artifactId>perfmark-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -845,4 +845,11 @@ private Constants() {
public static final String REMOTE_LOGGING_S3_ENDPOINT = "remote.logging.s3.endpoint";

public static final String REMOTE_LOGGING_S3_REGION = "remote.logging.s3.region";

/**
* remote logging for GCS
*/
public static final String REMOTE_LOGGING_GCS_CREDENTIAL = "remote.logging.google.cloud.storage.credential";

public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import org.apache.commons.lang3.StringUtils;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import lombok.extern.slf4j.Slf4j;

import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

@Slf4j
public class GcsRemoteLogHandler implements RemoteLogHandler, Closeable {

private Storage gcsStorage;

private String bucketName;

private String credential;

private static GcsRemoteLogHandler instance;

private GcsRemoteLogHandler() {

}

public static synchronized GcsRemoteLogHandler getInstance() {
if (instance == null) {
instance = new GcsRemoteLogHandler();
instance.init();
}

return instance;
}

public void init() {
try {
credential = readCredentials();
bucketName = readBucketName();
gcsStorage = buildGcsStorage(credential);

checkBucketNameExists(bucketName);
} catch (IOException e) {
log.error("GCS Remote Log Handler init failed", e);
}
}

@Override
public void close() throws IOException {
try {
if (gcsStorage != null) {
gcsStorage.close();
}
} catch (Exception e) {
throw new IOException(e);
}
}

@Override
public void sendRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("send remote log {} to GCS {}", logPath, objectName);

BlobInfo blobInfo = BlobInfo.newBuilder(
BlobId.of(bucketName, objectName)).build();

gcsStorage.create(blobInfo, Files.readAllBytes(Paths.get(logPath)));
} catch (Exception e) {
log.error("error while sending remote log {} to GCS {}", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("get remote log on GCS {} to {}", objectName, logPath);

File dstFile = new File(logPath);
if (dstFile.isDirectory()) {
Files.delete(dstFile.toPath());
} else {
Files.createDirectories(dstFile.getParentFile().toPath());
}

Blob blob = gcsStorage.get(BlobId.of(bucketName, objectName));
blob.downloadTo(Paths.get(logPath));
} catch (Exception e) {
log.error("error while getting remote log on GCS {} to {}", objectName, logPath, e);
}
}

protected Storage buildGcsStorage(String credential) throws IOException {
return StorageOptions.newBuilder()
.setCredentials(ServiceAccountCredentials.fromStream(
Files.newInputStream(Paths.get(credential))))
.build()
.getService();
}

protected String readCredentials() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_GCS_CREDENTIAL);
}

protected String readBucketName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_GCS_BUCKET_NAME);
}

public void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException(Constants.REMOTE_LOGGING_GCS_BUCKET_NAME + " is blank");
}

boolean exist = false;
for (Bucket bucket : gcsStorage.list().iterateAll()) {
if (bucketName.equals(bucket.getName())) {
exist = true;
break;
}
}

if (!exist) {
throw new IllegalArgumentException(
"bucketName: " + bucketName + " is not exists, you need to create them by yourself");
} else {
log.info("bucketName: {} has been found", bucketName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public RemoteLogHandler getRemoteLogHandler() {
return OssRemoteLogHandler.getInstance();
} else if ("S3".equals(target)) {
return S3RemoteLogHandler.getInstance();
} else if ("GCS".equals(target)) {
return GcsRemoteLogHandler.getInstance();
}

log.error("No suitable remote logging target for {}", target);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.common.log.remote;

import static org.apache.dolphinscheduler.common.utils.LogUtils.getLocalLogBaseDir;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

Expand All @@ -36,8 +38,6 @@ public class RemoteLogUtils {

private static RemoteLogService remoteLogService;

private static final int OBJECT_NAME_COUNT = 2;

@Autowired
private RemoteLogService autowiredRemoteLogService;

Expand Down Expand Up @@ -79,16 +79,13 @@ public static boolean isRemoteLoggingEnable() {
}

public static String getObjectNameFromLogPath(String logPath) {
Path localLogBaseDirPath = Paths.get(getLocalLogBaseDir()).toAbsolutePath();

Path path = Paths.get(logPath);
int nameCount = path.getNameCount();

String logBaseDir = PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR);

if (nameCount < OBJECT_NAME_COUNT) {
return Paths.get(logBaseDir, logPath).toString();
} else {
return Paths.get(logBaseDir, path.subpath(nameCount - OBJECT_NAME_COUNT, nameCount).toString())
.toString();
}
String remoteLogBaseDir = PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR);
return Paths.get(remoteLogBaseDir, path.subpath(localLogBaseDirPath.getNameCount(), nameCount).toString())
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.utils;

import lombok.extern.slf4j.Slf4j;

import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.LoggerContext;

@Slf4j
public class LogUtils {

public static String getLocalLogBaseDir() {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
return loggerContext.getProperty("log.base.ctx");
}

}
4 changes: 4 additions & 0 deletions dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,8 @@ remote.logging.s3.bucket.name=<bucket.name>
remote.logging.s3.endpoint=<endpoint>
# s3 region, required if you set remote.logging.target=S3
remote.logging.s3.region=<region>
# the location of the google cloud credential, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>

Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,42 @@
package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import java.nio.file.Path;
import java.nio.file.Paths;

import lombok.extern.slf4j.Slf4j;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@Slf4j
@ExtendWith(MockitoExtension.class)
public class RemoteLogHandlerTest {

@Test
public void testGetObjectNameFromLogPath() {
final String logPath = "/path/to/dolphinscheduler/logs/20230116/8245922982496_1-1-3.log";
final String expectedObjectName = "logs/20230116/8245922982496_1-1-3.log";
Path currentRelativePath = Paths.get("");
String currentDir = currentRelativePath.toAbsolutePath().toString();
final String logPath = currentDir + "/logs/20230116/8245922982496/1/1/1.log";
log.info("logPath is: {}", logPath);

try (MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class)) {
final String expectedObjectName = "logs/20230116/8245922982496/1/1/1.log";

try (
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) {
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR))
.thenReturn("logs");
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");

String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

Assertions.assertEquals(expectedObjectName, objectName);
}
}
Expand Down
Loading

0 comments on commit edbcc17

Please sign in to comment.