Skip to content

Commit

Permalink
Optimization of log collector module (#60)
Browse files Browse the repository at this point in the history
* Add keyword filter strategy in log4j/log4j2.

* Use the stream log event timestamp as the enqueue time.

* Init the structure of supporting multiple disks/drivers.

* Load balancer strategies for storing stream log.

* Adjust the default value of configuration.

* Add test unit for job log storage.

* optimize the restful performance.

* Add debug mode in log collector; Add the strategy to discard log cache.

* Fix the problem in closing log4j bucket.

* Optimize the strategies.
  • Loading branch information
Davidhua1996 authored Dec 20, 2022
1 parent 0226450 commit 4052f2f
Show file tree
Hide file tree
Showing 47 changed files with 2,103 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j2.StreamisLog4j2AppenderConfig;
import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j2.filters.KeywordThresholdFilter;
import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -56,20 +57,26 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui
if ("LevelMatch".equals(filterStrategy)) {
((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(LevelMatchFilter.newBuilder().setOnMatch(Filter.Result.ACCEPT).setOnMismatch(Filter.Result.DENY)
.setLevel(Level.getLevel(this.configuration.getString(LOG_FILTER_LEVEL_MATCH))).build());
} else if ("ThresholdFilter".equals(filterStrategy)) {
} else if ("ThresholdMatch".equals(filterStrategy)) {
((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(ThresholdFilter.createFilter(Level
.getLevel(this.configuration.getString(LOG_FILTER_THRESHOLD_MATCH)), Filter.Result.ACCEPT, Filter.Result.DENY));
} else if ("RegexMatch".equals(filterStrategy)) {
((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(RegexFilter.createFilter(this.configuration.getString(LOG_FILTER_REGEX),
null, true, Filter.Result.ACCEPT, Filter.Result.DENY));
} else if ("Keyword".equals(filterStrategy)){
((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(
new KeywordThresholdFilter(
StringUtils.split(this.configuration.getString(LOG_FILTER_KEYWORDS), ","),
StringUtils.split(this.configuration.getString(LOG_FILTER_KEYWORDS_EXCLUDE), ",")));
}
}
}
String hadoopUser = EnvironmentInformation.getHadoopUser();
if (hadoopUser.equals("<no hadoop dependency found>") || hadoopUser.equals("<unknown>")){
hadoopUser = System.getProperty("user.name");
}
return builder.setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT))
return builder.setDebugMode(this.configuration.getBoolean(DEBUG_MODE))
.setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT))
.setRpcSocketTimeout(this.configuration.getInteger(LOG_RPC_SOCKET_TIMEOUT))
.setRpcSendRetryCnt(this.configuration.getInteger(LOG_RPC_SEND_RETRY_COUNT))
.setRpcServerRecoveryTimeInSec(this.configuration.getInteger(LOG_RPC_SERVER_RECOVERY_TIME))
Expand All @@ -81,10 +88,13 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui
hadoopUser))
.setRpcCacheSize(this.configuration.getInteger(LOG_RPC_CACHE_SIZE))
.setRpcCacheMaxConsumeThread(this.configuration.getInteger(LOG_PRC_CACHE_MAX_CONSUME_THREAD))
.setDiscard(this.configuration.getBoolean(LOG_RPC_CACHE_DISCARD))
.setDiscardWindow(this.configuration.getInteger(LOG_RPC_CACHE_DISCARD_WINDOW))
.setRpcBufferSize(this.configuration.getInteger(LOG_RPC_BUFFER_SIZE))
.setRpcBufferExpireTimeInSec(this.configuration.getInteger(LOG_RPC_BUFFER_EXPIRE_TIME)).build();
}


/**
* According to :
* String launchCommand =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import scala.Int;

import java.util.List;

Expand Down Expand Up @@ -86,8 +87,19 @@ public class FlinkStreamisConfigDefine {
* Max cache consume threads in log RPC module
*/
public static final ConfigOption<Integer> LOG_PRC_CACHE_MAX_CONSUME_THREAD = ConfigOptions.key("stream.log.rpc.cache.max-consume-thread")
.intType().defaultValue(10).withDescription("Max cache consume threads in log RPC module");
.intType().defaultValue(2).withDescription("Max cache consume threads in log RPC module");

/**
* If discard the useless log
*/
public static final ConfigOption<Boolean> LOG_RPC_CACHE_DISCARD = ConfigOptions.key("stream.log.rpc.cache.discard")
.booleanType().defaultValue(true).withDescription("If discard the useless log");

/**
* The window size of discarding
*/
public static final ConfigOption<Integer> LOG_RPC_CACHE_DISCARD_WINDOW = ConfigOptions.key("stream.log.rpc.cache.discard-window")
.intType().defaultValue(2).withDescription("The window size of discarding");
/**
* Buffer size in log RPC module
*/
Expand All @@ -104,7 +116,7 @@ public class FlinkStreamisConfigDefine {
* Log filter strategy list
*/
public static final ConfigOption<List<String>> LOG_FILTER_STRATEGIES = ConfigOptions.key("stream.log.filter.strategies")
.stringType().asList().defaultValues("LevelMatch").withDescription("Log filter strategy list");
.stringType().asList().defaultValues("Keyword").withDescription("Log filter strategy list");

/**
* Level value of LevelMatch filter strategy
Expand All @@ -122,4 +134,22 @@ public class FlinkStreamisConfigDefine {
*/
public static final ConfigOption<String> LOG_FILTER_REGEX = ConfigOptions.key("stream.log.filter.regex.value")
.stringType().defaultValue(".*").withDescription("Regex value of RegexMatch filter strategy");

/**
* Accept keywords of Keyword filter strategy
*/
public static final ConfigOption<String> LOG_FILTER_KEYWORDS = ConfigOptions.key("stream.log.filter.keywords")
.stringType().defaultValue("ERROR").withDescription("Accept keywords of Keyword filter strategy");

/**
* Exclude keywords of Keyword filter strategy
*/
public static final ConfigOption<String> LOG_FILTER_KEYWORDS_EXCLUDE = ConfigOptions.key("stream.log.filter.keywords.exclude")
.stringType().defaultValue("").withDescription("Exclude keywords of Keyword filter strategy");

/**
* Debug mode
*/
public static final ConfigOption<Boolean> DEBUG_MODE = ConfigOptions.key("stream.log.debug")
.booleanType().defaultValue(false).withDescription("Debug mode");
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Enumeration;
import java.util.Objects;
import java.util.Properties;

public class FlinkConfigurationLoadTest {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigurationLoadTest.class);
@Test
public void loadConfiguration() {
String configDir = Objects.requireNonNull(FlinkConfigurationLoadTest.class.getResource("/")).getFile();
Expand All @@ -21,4 +24,5 @@ public void loadConfiguration() {
}
GlobalConfiguration.loadConfiguration(configDir, dynamicConfiguration);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.webank.wedatasphere.streamis.jobmanager.log.collector.flink.FlinkStreamisConfigAutowired
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019 WeBank
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<configuration status="error" monitorInterval="30">
<appenders>
<StreamRpcLog name="StreamRpcLog" appName="stream_application">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"/>
<RpcLogSender sendRetryCnt="3" connectionTimeout="3000"
socketTimeout="20000" serverRecoveryTimeInSec="5" maxDelayTimeInSec="60">
<AuthConfig tokenCodeKey="" tokenCode="" tokenUser="" tokenUserKey=""/>
<SendLogCache size="300" maxConsumeThread="10"/>
<SendBuffer size="50" expireTimeInSec="2"/>
</RpcLogSender>
</StreamRpcLog>`
</appenders>
<loggers>
<root level="INFO" additivity="true">
<!-- <appender-ref ref="Console"/>-->
<appender-ref ref="StreamRpcLog"/>
</root>
</loggers>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class RpcLogSenderConfig {
*/
private int maxDelayTimeInSec = 60;

/**
* If open debug mode
*/
private boolean debugMode = false;
/**
* Auth config
*/
Expand Down Expand Up @@ -149,6 +153,14 @@ public void setServerRecoveryTimeInSec(int serverRecoveryTimeInSec) {
this.serverRecoveryTimeInSec = serverRecoveryTimeInSec;
}

public boolean isDebugMode() {
return debugMode;
}

public void setDebugMode(boolean debugMode) {
this.debugMode = debugMode;
}

@Override
public String toString() {
return "RpcLogSenderConfig{" +
Expand All @@ -161,6 +173,8 @@ public String toString() {
", authConfig=" + authConfig +
", cacheConfig=" + cacheConfig +
", bufferConfig=" + bufferConfig +
", debug=" + debugMode +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ public class SendLogCacheConfig {
/**
* Max number of consuming thread
*/
private int maxConsumeThread = 10;
private int maxConsumeThread = 2;

/**
* The switch to discard log
*/
private boolean discard = true;

/**
* Discard window in second
*/
private int discardWindow = 2;

public SendLogCacheConfig(){

Expand All @@ -40,11 +50,32 @@ public void setMaxConsumeThread(int maxConsumeThread) {
this.maxConsumeThread = maxConsumeThread;
}

public boolean isDiscard() {
return discard;
}

public void setDiscard(boolean discard) {
this.discard = discard;
}

public int getDiscardWindow() {
return discardWindow;
}

public void setDiscardWindow(int discardWindow) {
this.discardWindow = discardWindow;
}

@Override
public String toString() {
return "SendLogCacheConfig{" +
"size=" + size +
", maxConsumeThread=" + maxConsumeThread +
", discard=" + discard +
", discardWindow=" + discardWindow +
'}';
}



}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.webank.wedatasphere.streamis.jobmanager.log.collector.config;

import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -14,9 +17,15 @@ public class StreamisLogAppenderConfig {

protected final RpcLogSenderConfig senderConfig;

protected StreamisLogAppenderConfig(String applicationName, RpcLogSenderConfig rpcLogSenderConfig){
/**
* Message filters
*/
protected final List<LogMessageFilter> messageFilters;
protected StreamisLogAppenderConfig(String applicationName, RpcLogSenderConfig rpcLogSenderConfig,
List<LogMessageFilter> messageFilters){
this.applicationName = applicationName;
this.senderConfig = null != rpcLogSenderConfig? rpcLogSenderConfig : new RpcLogSenderConfig();
this.messageFilters = messageFilters;
}

public static class Builder{
Expand All @@ -30,6 +39,11 @@ public static class Builder{
*/
protected final RpcLogSenderConfig rpcLogSenderConfig;

/**
* Message filters
*/
protected final List<LogMessageFilter> messageFilters = new ArrayList<>();

public Builder(String applicationName,
RpcLogSenderConfig rpcLogSenderConfig){
this.applicationName = applicationName;
Expand Down Expand Up @@ -189,8 +203,47 @@ public StreamisLogAppenderConfig.Builder setRpcBufferExpireTimeInSec(int expireT
return this;
}

/**
* Add log message filter
* @param messageFilter message filter
* @return builder
*/
public StreamisLogAppenderConfig.Builder withMessageFilter(LogMessageFilter messageFilter){
this.messageFilters.add(messageFilter);
return this;
}

/**
* Set to discard the useless log
* @param discard discard
* @return builder
*/
public StreamisLogAppenderConfig.Builder setDiscard(boolean discard){
this.rpcLogSenderConfig.getCacheConfig().setDiscard(discard);
return this;
}

/**
* Set the window size of discarding
* @param windowSize
* @return
*/
public StreamisLogAppenderConfig.Builder setDiscardWindow(int windowSize){
this.rpcLogSenderConfig.getCacheConfig().setDiscardWindow(windowSize);
return this;
}
/**
* Switch to debug
* @param debugMode debug mode
* @return builder
*/
public StreamisLogAppenderConfig.Builder setDebugMode(boolean debugMode){
this.rpcLogSenderConfig.setDebugMode(debugMode);
return this;
}

public StreamisLogAppenderConfig build(){
return new StreamisLogAppenderConfig(applicationName, rpcLogSenderConfig);
return new StreamisLogAppenderConfig(applicationName, rpcLogSenderConfig, messageFilters);
}
}
public String getApplicationName() {
Expand All @@ -202,4 +255,7 @@ public RpcLogSenderConfig getSenderConfig() {
return senderConfig;
}

public List<LogMessageFilter> getMessageFilters() {
return messageFilters;
}
}
Loading

0 comments on commit 4052f2f

Please sign in to comment.