From 7c7b004a36e44f633616240ceb6b29cd8e652982 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Tue, 16 Jan 2024 21:52:11 +0800 Subject: [PATCH] bugfix mongodb collect cause system thread oom (#1488) Signed-off-by: tomsun28 --- .../mongodb/MongodbSingleCollectImpl.java | 35 +++++++++---------- .../collector/dispatch/CommonDispatcher.java | 3 +- .../collector/dispatch/WorkerPool.java | 6 ++-- collector/src/main/resources/application.yml | 3 ++ 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java index efd6f4150c8..8b8600a9a08 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java @@ -24,6 +24,7 @@ import com.mongodb.MongoServerUnavailableException; import com.mongodb.MongoTimeoutException; +import com.mongodb.client.ClientSession; import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier; import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache; import org.dromara.hertzbeat.collector.collect.common.cache.MongodbConnect; @@ -99,15 +100,17 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri builder.setMsg("unsupported mongodb diagnostic command: " + command); return; } + ClientSession clientSession = null; try { MongoClient mongoClient = getClient(metrics); MongoDatabase mongoDatabase = mongoClient.getDatabase(metrics.getMongodb().getDatabase()); + clientSession = mongoClient.startSession(); CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); Document document; if (metricsParts.length == 1) { - document = mongoDatabase.runCommand(new Document(command, 1)); + document = mongoDatabase.runCommand(clientSession, new Document(command, 1)); } else { - document = mongoDatabase.runCommand(new Document(command, 1)); + document = mongoDatabase.runCommand(clientSession, new Document(command, 1)); for (int i = 1; i < metricsParts.length; i++) { document = (Document) document.get(metricsParts[i]); } @@ -126,6 +129,12 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri String message = CommonUtil.getMessageFromThrowable(e); builder.setMsg(message); log.warn(message, e); + } finally { + if (clientSession != null) { + try { + clientSession.close(); + } catch (Exception ignored) {} + } } } @@ -168,7 +177,9 @@ private void preCheck(Metrics metrics) { } /** - * 通过metrics中的mongodb连接信息获取 mongodb client + * 通过metrics中的mongodb连接信息获取 + * mongodb client本身不存在网络调用,和网络链接。对每次采集,我们需要新建session并使用后关闭它 + * mongodb client is thread pool, we need to create the session for each collect */ private MongoClient getClient(Metrics metrics) { MongodbProtocol mongodbProtocol = metrics.getMongodb(); @@ -181,32 +192,18 @@ private MongoClient getClient(Metrics metrics) { if (cacheOption.isPresent()) { MongodbConnect mongodbConnect = (MongodbConnect) cacheOption.get(); mongoClient = mongodbConnect.getMongoClient(); - try { - // detect this connection is available? - mongoClient.getClusterDescription(); - } catch (Exception e) { - log.info("The mongodb connect client from cache is invalid: {}", e.getMessage()); - try { - mongoClient.close(); - } catch (Exception e2) { - log.error(e2.getMessage()); - } - mongoClient = null; - ConnectionCommonCache.getInstance().removeCache(identifier); - } } if (mongoClient != null) { return mongoClient; } // 复用失败则新建连接 connect to mongodb - String url; // 密码可能包含特殊字符,需要使用类似js的encodeURIComponent进行编码,这里使用java的URLEncoder - url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(), + String url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(), URLEncoder.encode(mongodbProtocol.getPassword(), StandardCharsets.UTF_8), mongodbProtocol.getHost(), mongodbProtocol.getPort(), mongodbProtocol.getDatabase(), mongodbProtocol.getAuthenticationDatabase()); mongoClient = MongoClients.create(url); MongodbConnect mongodbConnect = new MongodbConnect(mongoClient); - ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect); + ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect, 3600 * 1000L); return mongoClient; } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java index d423fab804c..d1925374885 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java @@ -126,6 +126,7 @@ public void start() { log.error("[Dispatcher]-{}.", e.getMessage(), e); } } + log.info("Thread Interrupted, Shutdown the [metrics-task-dispatcher]"); }); // monitoring metrics collection task execution timeout ThreadFactory threadFactory = new ThreadFactoryBuilder() @@ -178,7 +179,7 @@ public void dispatchMetricsTask(Timeout timeout) { job.constructPriorMetrics(); Set metricsSet = job.getNextCollectMetrics(null, true); metricsSet.forEach(metrics -> { - MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this, + MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this, collectorIdentity, unitConvertList); jobRequestQueue.addJob(metricsCollect); if (metrics.getPrometheus() != null) { diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/WorkerPool.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/WorkerPool.java index 0bfd3ae74dd..6ef64ad6a93 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/WorkerPool.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/WorkerPool.java @@ -45,14 +45,14 @@ private void initWorkExecutor() { // thread factory ThreadFactory threadFactory = new ThreadFactoryBuilder() .setUncaughtExceptionHandler((thread, throwable) -> { - log.error("workerExecutor has uncaughtException."); - log.error(throwable.getMessage(), throwable); + log.error("[Important] WorkerPool workerExecutor has uncaughtException.", throwable); + log.error("Thread Name {} : {}", thread.getName(), throwable.getMessage(), throwable); }) .setDaemon(true) .setNameFormat("collect-worker-%d") .build(); workerExecutor = new ThreadPoolExecutor(100, - 800, + 1024, 10, TimeUnit.SECONDS, new SynchronousQueue<>(), diff --git a/collector/src/main/resources/application.yml b/collector/src/main/resources/application.yml index d572631abcc..046b93240f0 100644 --- a/collector/src/main/resources/application.yml +++ b/collector/src/main/resources/application.yml @@ -15,6 +15,9 @@ server: port: 1159 shutdown: graceful + tomcat: + threads: + min-spare: 1 spring: application: name: ${HOSTNAME:@hertzbeat-collector@}${PID}