Skip to content

Commit

Permalink
remove sleep, probably busy-waiting (#1456)
Browse files Browse the repository at this point in the history
Signed-off-by: tomsun28 <tomsun28@outlook.com>
  • Loading branch information
tomsun28 authored Dec 29, 2023
1 parent 38628b6 commit c18d559
Show file tree
Hide file tree
Showing 17 changed files with 134 additions and 174 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,6 @@ HertzBeat is a top project under the [Dromara Open Source Community](https://dro

[Reddit Community](https://www.reddit.com/r/hertzbeat/)

[User Club](https://support.qq.com/products/379369)

[Follow Us Twitter](https://twitter.com/hertzbeat1024)

[Subscribe YouTube](https://www.youtube.com/channel/UCri75zfWX0GHqJFPENEbLow)
Expand All @@ -441,8 +439,7 @@ HertzBeat is a top project under the [Dromara Open Source Community](https://dro

##### Sponsor

- Thanks [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) sponsored server node.
- Thanks [蓝易云(全新智慧上云)](https://www.tsyvps.com/aff/BZBEGYLX) sponsored server node.
- Thanks [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) sponsored server node.


##### Open-Source Project Build From Open-Source
Expand Down
5 changes: 1 addition & 4 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,6 @@ HertzBeat 赫兹跳动是 [Dromara开源社区](https://dromara.org/) 下顶级

[Reddit Community](https://www.reddit.com/r/hertzbeat/)

[User Club](https://support.qq.com/products/379369)

[Follow Us Twitter](https://twitter.com/hertzbeat1024)

[Subscribe YouTube](https://www.youtube.com/channel/UCri75zfWX0GHqJFPENEbLow)
Expand All @@ -446,8 +444,7 @@ HertzBeat 赫兹跳动是 [Dromara开源社区](https://dromara.org/) 下顶级

##### 赞助

- 感谢 [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) 赞助服务器采集节点
- 感谢 [蓝易云(全新智慧上云)](https://www.tsyvps.com/aff/BZBEGYLX) 赞助服务器采集节点
- 感谢 [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) 赞助服务器采集节点

##### Open-Source Project Build From Open-Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.hertzbeat.collector.collect.common.cache;


import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -26,20 +27,20 @@
import java.util.concurrent.*;

/**
* lru common resource cache
* lru common resource cache for client-server connection
*
* @author tomsun28
*/
@Slf4j
public class CommonCache {
public class ConnectionCommonCache {

/**
* default cache time 800s
* default cache time 200s
*/
private static final long DEFAULT_CACHE_TIMEOUT = 800 * 1000L;
private static final long DEFAULT_CACHE_TIMEOUT = 200 * 1000L;

/**
* default cache num
* default max cache num
*/
private static final int DEFAULT_MAX_CAPACITY = 10000;

Expand All @@ -61,9 +62,9 @@ public class CommonCache {
/**
* the executor who clean cache when timeout
*/
private ThreadPoolExecutor cleanTimeoutExecutor;
private ThreadPoolExecutor timeoutCleanerExecutor;

private CommonCache() {
private ConnectionCommonCache() {
init();
}

Expand All @@ -76,23 +77,25 @@ private void init() {
if (value instanceof CacheCloseable) {
((CacheCloseable) value).close();
}
log.info("lru cache discard key: {}, value: {}.", key, value);
log.info("connection common cache discard key: {}, value: {}.", key, value);
}).build();
timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6);
cleanTimeoutExecutor = new ThreadPoolExecutor(1, 1,
1, TimeUnit.SECONDS,
// last-first-coverage algorithm, run the first and last thread, discard mid
timeoutCleanerExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
r -> new Thread(r, "lru-cache-timeout-cleaner"),
r -> new Thread(r, "connection-cache-timeout-cleaner"),
new ThreadPoolExecutor.DiscardOldestPolicy());
// init monitor available detector cyc task
ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1,
r -> new Thread(r, "lru-cache-available-detector"));
scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable,
2, 20, TimeUnit.MINUTES);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("connection-cache-ava-detector-%d")
.setDaemon(true)
.build();
ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable, 2, 20, TimeUnit.MINUTES);
}

/**
* detect all cache available, cleanup not ava object
* detect all cache available, cleanup not ava connection
*/
private void detectCacheAvailable() {
try {
Expand All @@ -110,7 +113,7 @@ private void detectCacheAvailable() {
}
});
} catch (Exception e) {
log.error("detect cache available error: {}.", e.getMessage(), e);
log.error("connection common cache detect cache available error: {}.", e.getMessage(), e);
}
}

Expand All @@ -127,17 +130,18 @@ private void cleanTimeoutCache() {
timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT});
} else if (cacheTime[0] + cacheTime[1] < currentTime) {
// timeout, remove this object cache
log.warn("[cache] clean the timeout cache, key {}", key);
log.warn("[connection common cache] clean the timeout cache, key {}", key);
timeoutMap.remove(key);
cacheMap.remove(key);
if (value instanceof CacheCloseable) {
log.warn("[cache] close the timeout cache, key {}", key);
log.warn("[connection common cache] close the timeout cache, key {}", key);
((CacheCloseable) value).close();
}
}
});
Thread.sleep(20 * 1000);
} catch (Exception e) {
log.error("[cache] clean timeout cache error: {}.", e.getMessage(), e);
log.error("[connection common cache] clean timeout cache error: {}.", e.getMessage(), e);
}
}

Expand All @@ -155,14 +159,7 @@ public void addCache(Object key, Object value, Long timeDiff) {
}
cacheMap.put(key, value);
timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff});
cleanTimeoutExecutor.execute(() -> {
try {
cleanTimeoutCache();
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
});
timeoutCleanerExecutor.execute(this::cleanTimeoutCache);
}

/**
Expand All @@ -185,18 +182,18 @@ public void addCache(Object key, Object value) {
public Optional<Object> getCache(Object key, boolean refreshCache) {
Long[] cacheTime = timeoutMap.get(key);
if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) {
log.info("[cache] not hit the cache, key {}.", key);
log.info("[connection common cache] not hit the cache, key {}.", key);
return Optional.empty();
}
if (cacheTime[0] + cacheTime[1] < System.currentTimeMillis()) {
log.warn("[cache] is timeout, remove it, key {}.", key);
log.warn("[connection common cache] is timeout, remove it, key {}.", key);
timeoutMap.remove(key);
cacheMap.remove(key);
return Optional.empty();
}
Object value = cacheMap.get(key);
if (value == null) {
log.error("[cache] value is null, remove it, key {}.", key);
log.error("[connection common cache] value is null, remove it, key {}.", key);
cacheMap.remove(key);
timeoutMap.remove(key);
} else if (refreshCache) {
Expand All @@ -222,16 +219,16 @@ public void removeCache(Object key) {
/**
* get common cache instance
*
* @return cache
* @return connection common cache
*/
public static CommonCache getInstance() {
public static ConnectionCommonCache getInstance() {
return SingleInstance.INSTANCE;
}

/**
* static instance
* static single instance
*/
private static class SingleInstance {
private static final CommonCache INSTANCE = new CommonCache();
private static final ConnectionCommonCache INSTANCE = new ConnectionCommonCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@Slf4j
public class JdbcConnect implements CacheCloseable {

private Connection connection;
private final Connection connection;

public JdbcConnect(Connection connection) {
this.connection = connection;
Expand All @@ -41,7 +41,7 @@ public void close() {
connection.close();
}
} catch (Exception e) {
log.error("close jdbc connect error: {}", e.getMessage());
log.error("[connection common cache] close jdbc connect error: {}", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@Slf4j
public class JmxConnect implements CacheCloseable {

private JMXConnector connection;
private final JMXConnector connection;

public JmxConnect(JMXConnector connection) {
this.connection = connection;
Expand All @@ -25,7 +25,7 @@ public void close() {
connection.close();
}
} catch (Exception e) {
log.error("close redis connect error: {}", e.getMessage());
log.error("[connection common cache] close jmx connect error: {}", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
@Slf4j
public class MongodbConnect implements CacheCloseable {
private MongoClient mongoClient;
private final MongoClient mongoClient;

public MongodbConnect(MongoClient mongoClient) {
this.mongoClient = mongoClient;
Expand All @@ -22,7 +22,7 @@ public void close() {
this.mongoClient.close();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
log.error("[connection common cache] close mongodb connect error: {}", e.getMessage());
}
}
public MongoClient getMongoClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@Slf4j
public class RedisConnect implements CacheCloseable {

private StatefulConnection<String, String> connection;
private final StatefulConnection<String, String> connection;

public RedisConnect(StatefulConnection<String, String> connection) {
this.connection = connection;
Expand All @@ -40,7 +40,7 @@ public void close() {
connection.closeAsync();
}
} catch (Exception e) {
log.error("close redis connect error: {}", e.getMessage());
log.error("[connection common cache] close redis connect error: {}", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
@Slf4j
public class SshConnect implements CacheCloseable {
private ClientSession clientSession;
private final ClientSession clientSession;

public SshConnect(ClientSession clientSession) {
this.clientSession = clientSession;
Expand All @@ -22,7 +22,7 @@ public void close() {
clientSession.close();
}
} catch (Exception e) {
log.error("close ssh connect error: {}", e.getMessage());
log.error("[connection common cache] close ssh connect error: {}", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.dromara.hertzbeat.collector.collect.common.http;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
Expand All @@ -37,6 +38,9 @@
import java.security.cert.CertificateExpiredException;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -76,8 +80,7 @@ public class CommonHttpClient {
private static final int SOCKET_TIMEOUT = 60000;

/**
* validated time for idle connection
* 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性
* validated time for idle connection. if when reuse this connection after this time, we will check it available.
*/
private static final int INACTIVITY_VALIDATED_TIME = 10000;

Expand All @@ -91,13 +94,12 @@ public class CommonHttpClient {
SSLContext sslContext = SSLContexts.createDefault();
X509TrustManager x509TrustManager = new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { }
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) { }
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
// check server certificate timeout
// 判断服务器证书有效时间
// check server ssl certificate expired
Date now = new Date();
if (x509Certificates != null && x509Certificates.length > 0) {
if (x509Certificates != null) {
for (X509Certificate certificate : x509Certificates) {
Date deadline = certificate.getNotAfter();
if (deadline != null && now.after(deadline)) {
Expand Down Expand Up @@ -130,26 +132,21 @@ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) thr
httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
// 定期清理不可用过期连接
// clean up unavailable expired connections
.evictExpiredConnections()
// 定期清理可用但空闲的连接
// clean up available but idle connections
.evictIdleConnections(100, TimeUnit.SECONDS)
.build();
Thread connectCleaner = new Thread(() -> {
while (Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(30000);
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(100, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
});
connectCleaner.setName("http-connection-pool-cleaner");
connectCleaner.setDaemon(true);
connectCleaner.start();
} catch (Exception e) {
}
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("http-connection-pool-cleaner-%d")
.setDaemon(true)
.build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1, threadFactory);
scheduledExecutor.scheduleWithFixedDelay(() -> {
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(100, TimeUnit.SECONDS);
}, 40L, 40L, TimeUnit.SECONDS);
} catch (Exception ignored) {}
}

public static CloseableHttpClient getHttpClient() {
Expand Down
Loading

0 comments on commit c18d559

Please sign in to comment.