Skip to content

Commit

Permalink
bugfix mongodb collect cause system thread oom (#1488)
Browse files Browse the repository at this point in the history
Signed-off-by: tomsun28 <tomsun28@outlook.com>
  • Loading branch information
tomsun28 authored Jan 16, 2024
1 parent d4ac893 commit 7c2bd1a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.mongodb.MongoServerUnavailableException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.client.ClientSession;
import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier;
import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache;
import org.dromara.hertzbeat.collector.collect.common.cache.MongodbConnect;
Expand Down Expand Up @@ -99,15 +100,17 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
builder.setMsg("unsupported mongodb diagnostic command: " + command);
return;
}
ClientSession clientSession = null;
try {
MongoClient mongoClient = getClient(metrics);
MongoDatabase mongoDatabase = mongoClient.getDatabase(metrics.getMongodb().getDatabase());
clientSession = mongoClient.startSession();
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
Document document;
if (metricsParts.length == 1) {
document = mongoDatabase.runCommand(new Document(command, 1));
document = mongoDatabase.runCommand(clientSession, new Document(command, 1));
} else {
document = mongoDatabase.runCommand(new Document(command, 1));
document = mongoDatabase.runCommand(clientSession, new Document(command, 1));
for (int i = 1; i < metricsParts.length; i++) {
document = (Document) document.get(metricsParts[i]);
}
Expand All @@ -126,6 +129,12 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
String message = CommonUtil.getMessageFromThrowable(e);
builder.setMsg(message);
log.warn(message, e);
} finally {
if (clientSession != null) {
try {
clientSession.close();
} catch (Exception ignored) {}
}
}
}

Expand Down Expand Up @@ -168,7 +177,9 @@ private void preCheck(Metrics metrics) {
}

/**
* 通过metrics中的mongodb连接信息获取 mongodb client
* 通过metrics中的mongodb连接信息获取
* mongodb client本身不存在网络调用,和网络链接。对每次采集,我们需要新建session并使用后关闭它
* mongodb client is thread pool, we need to create the session for each collect
*/
private MongoClient getClient(Metrics metrics) {
MongodbProtocol mongodbProtocol = metrics.getMongodb();
Expand All @@ -181,32 +192,18 @@ private MongoClient getClient(Metrics metrics) {
if (cacheOption.isPresent()) {
MongodbConnect mongodbConnect = (MongodbConnect) cacheOption.get();
mongoClient = mongodbConnect.getMongoClient();
try {
// detect this connection is available?
mongoClient.getClusterDescription();
} catch (Exception e) {
log.info("The mongodb connect client from cache is invalid: {}", e.getMessage());
try {
mongoClient.close();
} catch (Exception e2) {
log.error(e2.getMessage());
}
mongoClient = null;
ConnectionCommonCache.getInstance().removeCache(identifier);
}
}
if (mongoClient != null) {
return mongoClient;
}
// 复用失败则新建连接 connect to mongodb
String url;
// 密码可能包含特殊字符,需要使用类似js的encodeURIComponent进行编码,这里使用java的URLEncoder
url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(),
String url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(),
URLEncoder.encode(mongodbProtocol.getPassword(), StandardCharsets.UTF_8), mongodbProtocol.getHost(), mongodbProtocol.getPort(),
mongodbProtocol.getDatabase(), mongodbProtocol.getAuthenticationDatabase());
mongoClient = MongoClients.create(url);
MongodbConnect mongodbConnect = new MongodbConnect(mongoClient);
ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect);
ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect, 3600 * 1000L);
return mongoClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void start() {
log.error("[Dispatcher]-{}.", e.getMessage(), e);
}
}
log.info("Thread Interrupted, Shutdown the [metrics-task-dispatcher]");
});
// monitoring metrics collection task execution timeout
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Expand Down Expand Up @@ -178,7 +179,7 @@ public void dispatchMetricsTask(Timeout timeout) {
job.constructPriorMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this,
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
if (metrics.getPrometheus() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ private void initWorkExecutor() {
// thread factory
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("workerExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable);
log.error("[Important] WorkerPool workerExecutor has uncaughtException.", throwable);
log.error("Thread Name {} : {}", thread.getName(), throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(100,
800,
1024,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand Down
3 changes: 3 additions & 0 deletions collector/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
server:
port: 1159
shutdown: graceful
tomcat:
threads:
min-spare: 1
spring:
application:
name: ${HOSTNAME:@hertzbeat-collector@}${PID}
Expand Down

0 comments on commit 7c2bd1a

Please sign in to comment.