-
Notifications
You must be signed in to change notification settings - Fork 999
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
…647) [collector]feature:Add monitoring parameters for Redis Cluster and Sentinel [collector]feature:Add monitoring parameters for Redis Cluster and Sentinel [collector]feature:Add monitoring for cluster and sentinel Update manager src main resources define param param-redis.yml Co-authored-by: tomsun28 <tomsun28@outlook.com> --------- Co-authored-by: hudongdong <hudongdong9406@zto.com> Co-authored-by: tomsun28 <tomsun28@outlook.com>
- Loading branch information
1 parent
f8f5be2
commit 1a2b58e
Showing
14 changed files
with
1,303 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
81 changes: 81 additions & 0 deletions
81
collector/src/main/java/com/usthe/collector/collect/redis/RedisClusterCollectImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package com.usthe.collector.collect.redis; | ||
|
||
import com.usthe.collector.collect.common.cache.CacheIdentifier; | ||
import com.usthe.collector.collect.common.cache.CommonCache; | ||
import com.usthe.collector.collect.common.cache.RedisConnect; | ||
import com.usthe.common.entity.job.Metrics; | ||
import com.usthe.common.entity.job.protocol.RedisProtocol; | ||
import com.usthe.common.entity.message.CollectRep; | ||
import com.usthe.common.util.CommonConstants; | ||
import io.lettuce.core.cluster.RedisClusterClient; | ||
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; | ||
import io.lettuce.core.resource.ClientResources; | ||
import io.lettuce.core.resource.DefaultClientResources; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* @description: Redis 集群指标收集器 | ||
* @author: hdd | ||
* @create: 2023/02/17 | ||
*/ | ||
@Slf4j | ||
public class RedisClusterCollectImpl extends RedisCommonCollectImpl { | ||
|
||
private static final String CLUSTER_INFO = "cluster"; | ||
|
||
private final ClientResources defaultClientResources; | ||
|
||
public RedisClusterCollectImpl() { | ||
defaultClientResources = DefaultClientResources.create(); | ||
} | ||
|
||
|
||
public Map<String, String> getRedisInfo(Metrics metrics) { | ||
StatefulRedisClusterConnection<String, String> connection = getConnection(metrics.getRedis()); | ||
String info = connection.sync().info(); | ||
Map<String, String> valueMap = parseInfo(info); | ||
if (Objects.equals(metrics.getName(), CLUSTER_INFO)) { | ||
String clusterNodes = connection.sync().clusterInfo(); | ||
valueMap.putAll(parseInfo(clusterNodes)); | ||
} | ||
if (log.isDebugEnabled()) { | ||
log.debug("[RedisSingleCollectImpl] fetch redis info"); | ||
valueMap.forEach((k, v) -> log.debug("{} : {}", k, v)); | ||
} | ||
return valueMap; | ||
} | ||
|
||
|
||
/** | ||
* obtain StatefulRedisClusterConnection | ||
* | ||
* @param redisProtocol | ||
* @return | ||
*/ | ||
private StatefulRedisClusterConnection<String, String> getConnection(RedisProtocol redisProtocol) { | ||
CacheIdentifier identifier = doIdentifier(redisProtocol); | ||
|
||
StatefulRedisClusterConnection<String, String> connection = (StatefulRedisClusterConnection<String, String>) getStatefulConnection(identifier); | ||
if (connection == null) { | ||
// reuse connection failed, new one | ||
RedisClusterClient redisClusterClient = buildClient(redisProtocol); | ||
connection = redisClusterClient.connect(); | ||
CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); | ||
} | ||
return connection; | ||
} | ||
|
||
|
||
/** | ||
* build single redis client | ||
* | ||
* @param redisProtocol redis protocol config | ||
* @return redis single client | ||
*/ | ||
private RedisClusterClient buildClient(RedisProtocol redisProtocol) { | ||
return RedisClusterClient.create(defaultClientResources, redisUri(redisProtocol)); | ||
} | ||
} |
196 changes: 196 additions & 0 deletions
196
collector/src/main/java/com/usthe/collector/collect/redis/RedisCommonCollectImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
package com.usthe.collector.collect.redis; | ||
|
||
import com.usthe.collector.collect.AbstractCollect; | ||
import com.usthe.collector.collect.common.cache.CacheIdentifier; | ||
import com.usthe.collector.collect.common.cache.CommonCache; | ||
import com.usthe.collector.collect.common.cache.RedisConnect; | ||
import com.usthe.collector.dispatch.DispatchConstants; | ||
import com.usthe.collector.util.CollectUtil; | ||
import com.usthe.common.entity.job.Metrics; | ||
import com.usthe.common.entity.job.protocol.RedisProtocol; | ||
import com.usthe.common.entity.message.CollectRep; | ||
import com.usthe.common.util.CommonConstants; | ||
import com.usthe.common.util.CommonUtil; | ||
import io.lettuce.core.RedisConnectionException; | ||
import io.lettuce.core.RedisURI; | ||
import io.lettuce.core.api.StatefulConnection; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.util.Assert; | ||
import org.springframework.util.StringUtils; | ||
|
||
import java.time.Duration; | ||
import java.util.*; | ||
|
||
/** | ||
* @description: | ||
* @author: hdd | ||
* @create: 2023/02/19 | ||
*/ | ||
@Slf4j | ||
public class RedisCommonCollectImpl extends AbstractCollect { | ||
|
||
|
||
private static final String CLUSTER = "3"; | ||
|
||
@Override | ||
public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) { | ||
try { | ||
preCheck(metrics); | ||
} catch (Exception e) { | ||
builder.setCode(CollectRep.Code.FAIL); | ||
builder.setMsg(e.getMessage()); | ||
return; | ||
} | ||
try { | ||
Map<String, String> redisInfo ; | ||
if (Objects.nonNull(metrics.getRedis().getPattern()) && Objects.equals(metrics.getRedis().getPattern(), CLUSTER)) { | ||
RedisClusterCollectImpl redisClusterCollect = new RedisClusterCollectImpl(); | ||
redisInfo = redisClusterCollect.getRedisInfo(metrics); | ||
} else { | ||
RedisSingleCollectImpl redisSingleCollect = new RedisSingleCollectImpl(); | ||
redisInfo = redisSingleCollect.getRedisInfo(metrics); | ||
} | ||
doMetricsData(builder, redisInfo, metrics); | ||
} catch (RedisConnectionException connectionException) { | ||
String errorMsg = CommonUtil.getMessageFromThrowable(connectionException); | ||
log.info("[redis connection] error: {}", errorMsg); | ||
builder.setCode(CollectRep.Code.UN_CONNECTABLE); | ||
builder.setMsg(errorMsg); | ||
} catch (Exception e) { | ||
String errorMsg = CommonUtil.getMessageFromThrowable(e); | ||
log.warn("[redis collect] error: {}", e.getMessage(), e); | ||
builder.setCode(CollectRep.Code.FAIL); | ||
builder.setMsg(errorMsg); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public String supportProtocol() { | ||
return DispatchConstants.PROTOCOL_REDIS; | ||
} | ||
|
||
/** | ||
* parse redis info | ||
* | ||
* @param info redis info | ||
* @return parsed redis info | ||
*/ | ||
protected Map<String, String> parseInfo(String info) { | ||
String[] lines = info.split("\n"); | ||
Map<String, String> result = new HashMap<>(128); | ||
Arrays.stream(lines) | ||
.filter(it -> StringUtils.hasText(it) && !it.startsWith("#") && it.contains(":")) | ||
.map(this::removeCr) | ||
.map(r -> r.split(":")) | ||
.forEach(it -> { | ||
if (it.length > 1) { | ||
result.put(it[0], it[1]); | ||
} | ||
}); | ||
return result; | ||
} | ||
|
||
|
||
/** | ||
* structure | ||
* @param redisProtocol | ||
* @return | ||
*/ | ||
protected RedisURI redisUri(RedisProtocol redisProtocol) { | ||
RedisURI redisUri = RedisURI.create(redisProtocol.getHost(), Integer.parseInt(redisProtocol.getPort())); | ||
if (StringUtils.hasText(redisProtocol.getUsername())) { | ||
redisUri.setUsername(redisProtocol.getUsername()); | ||
} | ||
if (StringUtils.hasText(redisProtocol.getPassword())) { | ||
redisUri.setPassword(redisProtocol.getPassword().toCharArray()); | ||
} | ||
Duration timeout = Duration.ofMillis(CollectUtil.getTimeout(redisProtocol.getTimeout())); | ||
redisUri.setTimeout(timeout); | ||
return redisUri; | ||
} | ||
|
||
|
||
/** | ||
* build redis cache key | ||
* @param redisProtocol | ||
* @return | ||
*/ | ||
protected CacheIdentifier doIdentifier(RedisProtocol redisProtocol) { | ||
return CacheIdentifier.builder() | ||
.ip(redisProtocol.getHost()) | ||
.port(redisProtocol.getPort()) | ||
.username(redisProtocol.getUsername()) | ||
.password(redisProtocol.getPassword()) | ||
.build(); | ||
} | ||
|
||
|
||
/** | ||
* get redis connection | ||
* @param identifier | ||
* @return | ||
*/ | ||
protected StatefulConnection<String, String> getStatefulConnection(CacheIdentifier identifier) { | ||
StatefulConnection<String, String> connection = null; | ||
Optional<Object> cacheOption = CommonCache.getInstance().getCache(identifier, true); | ||
if (cacheOption.isPresent()) { | ||
RedisConnect redisConnect = (RedisConnect) cacheOption.get(); | ||
connection = redisConnect.getConnection(); | ||
if (!connection.isOpen()) { | ||
try { | ||
connection.closeAsync(); | ||
} catch (Exception e) { | ||
log.info("The redis connect form cache, close error: {}", e.getMessage()); | ||
} | ||
connection = null; | ||
CommonCache.getInstance().removeCache(identifier); | ||
} | ||
} | ||
return connection; | ||
} | ||
|
||
|
||
/** | ||
* Build monitoring parameters according to redis info | ||
* @param builder | ||
* @param valueMap | ||
* @param metrics | ||
*/ | ||
private void doMetricsData(CollectRep.MetricsData.Builder builder, Map<String, String> valueMap, Metrics metrics) { | ||
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); | ||
metrics.getAliasFields().forEach(it -> { | ||
if (valueMap.containsKey(it)) { | ||
String fieldValue = valueMap.get(it); | ||
if (fieldValue == null) { | ||
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); | ||
} else { | ||
valueRowBuilder.addColumns(fieldValue); | ||
} | ||
} else { | ||
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); | ||
} | ||
}); | ||
builder.addValues(valueRowBuilder.build()); | ||
} | ||
|
||
|
||
/** | ||
* preCheck params | ||
*/ | ||
private void preCheck(Metrics metrics) { | ||
if (metrics == null || metrics.getRedis() == null) { | ||
throw new IllegalArgumentException("Redis collect must has redis params"); | ||
} | ||
RedisProtocol redisProtocol = metrics.getRedis(); | ||
Assert.hasText(redisProtocol.getHost(), "Redis Protocol host is required."); | ||
Assert.hasText(redisProtocol.getPort(), "Redis Protocol port is required."); | ||
} | ||
|
||
|
||
private String removeCr(String value) { | ||
return value.replace("\r", ""); | ||
} | ||
|
||
|
||
} |
Oops, something went wrong.