Skip to content

Commit

Permalink
task #614 [Add monitoring parameters for Redis Cluster and Sentinel] (#…
Browse files Browse the repository at this point in the history
…647)

  [collector]feature:Add monitoring parameters for Redis Cluster and Sentinel

  [collector]feature:Add monitoring parameters for Redis Cluster and Sentinel

  [collector]feature:Add monitoring for cluster and sentinel

  Update manager src main resources define param param-redis.yml

Co-authored-by: tomsun28 <tomsun28@outlook.com>

---------

Co-authored-by: hudongdong <hudongdong9406@zto.com>
Co-authored-by: tomsun28 <tomsun28@outlook.com>
  • Loading branch information
3 people authored Feb 20, 2023
1 parent 154f14f commit 9cec443
Show file tree
Hide file tree
Showing 14 changed files with 1,303 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.usthe.collector.collect.common.cache;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -29,9 +30,9 @@
@Slf4j
public class RedisConnect implements CacheCloseable {

private StatefulRedisConnection<String, String> connection;
private StatefulConnection<String, String> connection;

public RedisConnect(StatefulRedisConnection<String, String> connection) {
public RedisConnect(StatefulConnection<String, String> connection) {
this.connection = connection;
}

Expand All @@ -52,7 +53,7 @@ protected void finalize() throws Throwable {
super.finalize();
}

public StatefulRedisConnection<String, String> getConnection() {
public StatefulConnection<String, String> getConnection() {
return connection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.usthe.collector.collect.redis;

import com.usthe.collector.collect.common.cache.CacheIdentifier;
import com.usthe.collector.collect.common.cache.CommonCache;
import com.usthe.collector.collect.common.cache.RedisConnect;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.job.protocol.RedisProtocol;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Objects;

/**
* @description: Redis 集群指标收集器
* @author: hdd
* @create: 2023/02/17
*/
@Slf4j
public class RedisClusterCollectImpl extends RedisCommonCollectImpl {

private static final String CLUSTER_INFO = "cluster";

private final ClientResources defaultClientResources;

public RedisClusterCollectImpl() {
defaultClientResources = DefaultClientResources.create();
}


public Map<String, String> getRedisInfo(Metrics metrics) {
StatefulRedisClusterConnection<String, String> connection = getConnection(metrics.getRedis());
String info = connection.sync().info();
Map<String, String> valueMap = parseInfo(info);
if (Objects.equals(metrics.getName(), CLUSTER_INFO)) {
String clusterNodes = connection.sync().clusterInfo();
valueMap.putAll(parseInfo(clusterNodes));
}
if (log.isDebugEnabled()) {
log.debug("[RedisSingleCollectImpl] fetch redis info");
valueMap.forEach((k, v) -> log.debug("{} : {}", k, v));
}
return valueMap;
}


/**
* obtain StatefulRedisClusterConnection
*
* @param redisProtocol
* @return
*/
private StatefulRedisClusterConnection<String, String> getConnection(RedisProtocol redisProtocol) {
CacheIdentifier identifier = doIdentifier(redisProtocol);

StatefulRedisClusterConnection<String, String> connection = (StatefulRedisClusterConnection<String, String>) getStatefulConnection(identifier);
if (connection == null) {
// reuse connection failed, new one
RedisClusterClient redisClusterClient = buildClient(redisProtocol);
connection = redisClusterClient.connect();
CommonCache.getInstance().addCache(identifier, new RedisConnect(connection));
}
return connection;
}


/**
* build single redis client
*
* @param redisProtocol redis protocol config
* @return redis single client
*/
private RedisClusterClient buildClient(RedisProtocol redisProtocol) {
return RedisClusterClient.create(defaultClientResources, redisUri(redisProtocol));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package com.usthe.collector.collect.redis;

import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.collect.common.cache.CacheIdentifier;
import com.usthe.collector.collect.common.cache.CommonCache;
import com.usthe.collector.collect.common.cache.RedisConnect;
import com.usthe.collector.dispatch.DispatchConstants;
import com.usthe.collector.util.CollectUtil;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.job.protocol.RedisProtocol;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.common.util.CommonUtil;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.util.*;

/**
* @description:
* @author: hdd
* @create: 2023/02/19
*/
@Slf4j
public class RedisCommonCollectImpl extends AbstractCollect {


private static final String CLUSTER = "3";

@Override
public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
try {
preCheck(metrics);
} catch (Exception e) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(e.getMessage());
return;
}
try {
Map<String, String> redisInfo ;
if (Objects.nonNull(metrics.getRedis().getPattern()) && Objects.equals(metrics.getRedis().getPattern(), CLUSTER)) {
RedisClusterCollectImpl redisClusterCollect = new RedisClusterCollectImpl();
redisInfo = redisClusterCollect.getRedisInfo(metrics);
} else {
RedisSingleCollectImpl redisSingleCollect = new RedisSingleCollectImpl();
redisInfo = redisSingleCollect.getRedisInfo(metrics);
}
doMetricsData(builder, redisInfo, metrics);
} catch (RedisConnectionException connectionException) {
String errorMsg = CommonUtil.getMessageFromThrowable(connectionException);
log.info("[redis connection] error: {}", errorMsg);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg(errorMsg);
} catch (Exception e) {
String errorMsg = CommonUtil.getMessageFromThrowable(e);
log.warn("[redis collect] error: {}", e.getMessage(), e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(errorMsg);
}

}

@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_REDIS;
}

/**
* parse redis info
*
* @param info redis info
* @return parsed redis info
*/
protected Map<String, String> parseInfo(String info) {
String[] lines = info.split("\n");
Map<String, String> result = new HashMap<>(128);
Arrays.stream(lines)
.filter(it -> StringUtils.hasText(it) && !it.startsWith("#") && it.contains(":"))
.map(this::removeCr)
.map(r -> r.split(":"))
.forEach(it -> {
if (it.length > 1) {
result.put(it[0], it[1]);
}
});
return result;
}


/**
* structure
* @param redisProtocol
* @return
*/
protected RedisURI redisUri(RedisProtocol redisProtocol) {
RedisURI redisUri = RedisURI.create(redisProtocol.getHost(), Integer.parseInt(redisProtocol.getPort()));
if (StringUtils.hasText(redisProtocol.getUsername())) {
redisUri.setUsername(redisProtocol.getUsername());
}
if (StringUtils.hasText(redisProtocol.getPassword())) {
redisUri.setPassword(redisProtocol.getPassword().toCharArray());
}
Duration timeout = Duration.ofMillis(CollectUtil.getTimeout(redisProtocol.getTimeout()));
redisUri.setTimeout(timeout);
return redisUri;
}


/**
* build redis cache key
* @param redisProtocol
* @return
*/
protected CacheIdentifier doIdentifier(RedisProtocol redisProtocol) {
return CacheIdentifier.builder()
.ip(redisProtocol.getHost())
.port(redisProtocol.getPort())
.username(redisProtocol.getUsername())
.password(redisProtocol.getPassword())
.build();
}


/**
* get redis connection
* @param identifier
* @return
*/
protected StatefulConnection<String, String> getStatefulConnection(CacheIdentifier identifier) {
StatefulConnection<String, String> connection = null;
Optional<Object> cacheOption = CommonCache.getInstance().getCache(identifier, true);
if (cacheOption.isPresent()) {
RedisConnect redisConnect = (RedisConnect) cacheOption.get();
connection = redisConnect.getConnection();
if (!connection.isOpen()) {
try {
connection.closeAsync();
} catch (Exception e) {
log.info("The redis connect form cache, close error: {}", e.getMessage());
}
connection = null;
CommonCache.getInstance().removeCache(identifier);
}
}
return connection;
}


/**
* Build monitoring parameters according to redis info
* @param builder
* @param valueMap
* @param metrics
*/
private void doMetricsData(CollectRep.MetricsData.Builder builder, Map<String, String> valueMap, Metrics metrics) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
metrics.getAliasFields().forEach(it -> {
if (valueMap.containsKey(it)) {
String fieldValue = valueMap.get(it);
if (fieldValue == null) {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
} else {
valueRowBuilder.addColumns(fieldValue);
}
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
});
builder.addValues(valueRowBuilder.build());
}


/**
* preCheck params
*/
private void preCheck(Metrics metrics) {
if (metrics == null || metrics.getRedis() == null) {
throw new IllegalArgumentException("Redis collect must has redis params");
}
RedisProtocol redisProtocol = metrics.getRedis();
Assert.hasText(redisProtocol.getHost(), "Redis Protocol host is required.");
Assert.hasText(redisProtocol.getPort(), "Redis Protocol port is required.");
}


private String removeCr(String value) {
return value.replace("\r", "");
}


}
Loading

0 comments on commit 9cec443

Please sign in to comment.