Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix mongodb collect cause system thread oom #1488

Merged
merged 2 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.mongodb.MongoServerUnavailableException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.client.ClientSession;
import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier;
import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache;
import org.dromara.hertzbeat.collector.collect.common.cache.MongodbConnect;
Expand Down Expand Up @@ -99,15 +100,17 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
builder.setMsg("unsupported mongodb diagnostic command: " + command);
return;
}
ClientSession clientSession = null;
try {
MongoClient mongoClient = getClient(metrics);
MongoDatabase mongoDatabase = mongoClient.getDatabase(metrics.getMongodb().getDatabase());
clientSession = mongoClient.startSession();
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
Document document;
if (metricsParts.length == 1) {
document = mongoDatabase.runCommand(new Document(command, 1));
document = mongoDatabase.runCommand(clientSession, new Document(command, 1));
} else {
document = mongoDatabase.runCommand(new Document(command, 1));
document = mongoDatabase.runCommand(clientSession, new Document(command, 1));
for (int i = 1; i < metricsParts.length; i++) {
document = (Document) document.get(metricsParts[i]);
}
Expand All @@ -126,6 +129,12 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
String message = CommonUtil.getMessageFromThrowable(e);
builder.setMsg(message);
log.warn(message, e);
} finally {
if (clientSession != null) {
try {
clientSession.close();
} catch (Exception ignored) {}
}
}
}

Expand Down Expand Up @@ -168,7 +177,9 @@ private void preCheck(Metrics metrics) {
}

/**
* 通过metrics中的mongodb连接信息获取 mongodb client
* 通过metrics中的mongodb连接信息获取
* mongodb client本身不存在网络调用,和网络链接。对每次采集,我们需要新建session并使用后关闭它
* mongodb client is thread pool, we need to create the session for each collect
*/
private MongoClient getClient(Metrics metrics) {
MongodbProtocol mongodbProtocol = metrics.getMongodb();
Expand All @@ -181,32 +192,18 @@ private MongoClient getClient(Metrics metrics) {
if (cacheOption.isPresent()) {
MongodbConnect mongodbConnect = (MongodbConnect) cacheOption.get();
mongoClient = mongodbConnect.getMongoClient();
try {
// detect this connection is available?
mongoClient.getClusterDescription();
} catch (Exception e) {
log.info("The mongodb connect client from cache is invalid: {}", e.getMessage());
try {
mongoClient.close();
} catch (Exception e2) {
log.error(e2.getMessage());
}
mongoClient = null;
ConnectionCommonCache.getInstance().removeCache(identifier);
}
}
if (mongoClient != null) {
return mongoClient;
}
// 复用失败则新建连接 connect to mongodb
String url;
// 密码可能包含特殊字符,需要使用类似js的encodeURIComponent进行编码,这里使用java的URLEncoder
url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(),
String url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(),
URLEncoder.encode(mongodbProtocol.getPassword(), StandardCharsets.UTF_8), mongodbProtocol.getHost(), mongodbProtocol.getPort(),
mongodbProtocol.getDatabase(), mongodbProtocol.getAuthenticationDatabase());
mongoClient = MongoClients.create(url);
MongodbConnect mongodbConnect = new MongodbConnect(mongoClient);
ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect);
ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect, 3600 * 1000L);
return mongoClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void start() {
log.error("[Dispatcher]-{}.", e.getMessage(), e);
}
}
log.info("Thread Interrupted, Shutdown the [metrics-task-dispatcher]");
});
// monitoring metrics collection task execution timeout
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Expand Down Expand Up @@ -178,7 +179,7 @@ public void dispatchMetricsTask(Timeout timeout) {
job.constructPriorMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this,
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
if (metrics.getPrometheus() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ private void initWorkExecutor() {
// thread factory
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("workerExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable);
log.error("[Important] WorkerPool workerExecutor has uncaughtException.", throwable);
log.error("Thread Name {} : {}", thread.getName(), throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(100,
800,
1024,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand Down
3 changes: 3 additions & 0 deletions collector/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
server:
port: 1159
shutdown: graceful
tomcat:
threads:
min-spare: 1
spring:
application:
name: ${HOSTNAME:@hertzbeat-collector@}${PID}
Expand Down
Loading