Skip to content

Commit

Permalink
[feature] support redis cluster and sentinel mode in real time data (#…
Browse files Browse the repository at this point in the history
…2324)

Co-authored-by: tomsun28 <tomsun28@outlook.com>
  • Loading branch information
Calvin979 and tomsun28 authored Jul 21, 2024
1 parent 7aa3be8 commit 712c541
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface SignConstants {
String CARRIAGE_RETURN = "\r";

String RIGHT_DASH = "/";

String COMMA = ",";
}
12 changes: 8 additions & 4 deletions manager/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,19 @@ warehouse:
password: root
expire-time: '30d'
replication: 1

# store real-time metrics data, enable only one below
# store real-time metrics data, enable only one below
real-time:
memory:
enabled: true
init-size: 16
redis:
enabled: false
host: 127.0.0.1
port: 6379
# redis mode: single, sentinel, cluster. Default is single
mode: single
# separate each address with comma when using cluster mode, eg: 127.0.0.1:6379,127.0.0.1:6380
address: 127.0.0.1:6379
# enter master name when using sentinel mode
masterName: mymaster
password: 123456
# redis db index, default: DB0
db: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@Slf4j
public abstract class AbstractRealTimeDataStorage implements RealTimeDataReader, RealTimeDataWriter, DisposableBean {

protected boolean serverAvailable;
protected volatile boolean serverAvailable;

/**
* @return data Whether the storage is available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* Store and collect real-time data - memory
*/
@Component
@ConditionalOnProperty(prefix = "warehouse.store.memory", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnProperty(prefix = "warehouse.real-time.memory", name = "enabled", havingValue = "true", matchIfMissing = true)
@Slf4j
public class MemoryDataStorage extends AbstractRealTimeDataStorage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @param enabled Whether memory data storage is enabled
* @param initSize Memory storage map initialization size
*/
@ConfigurationProperties(prefix = "warehouse.store.memory")
@ConfigurationProperties(prefix = "warehouse.real-time.memory")
public record MemoryProperties(@DefaultValue("true") boolean enabled,
@DefaultValue("1024") Integer initSize) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.apache.hertzbeat.warehouse.store.realtime.redis;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.redis.client.RedisCommandDelegate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.lang.NonNull;
Expand All @@ -40,35 +34,26 @@
*/
@Primary
@Component
@ConditionalOnProperty(prefix = "warehouse.store.redis", name = "enabled", havingValue = "true")
@ConditionalOnProperty(prefix = "warehouse.real-time.redis", name = "enabled", havingValue = "true")
@Slf4j
public class RedisDataStorage extends AbstractRealTimeDataStorage {

private RedisClient redisClient;
private final Integer db;
private StatefulRedisConnection<String, CollectRep.MetricsData> connection;
private final RedisCommandDelegate redisCommandDelegate;

public RedisDataStorage(RedisProperties redisProperties) {
this.serverAvailable = initRedisClient(redisProperties);
this.db = getRedisSelectDb(redisProperties);
}

private Integer getRedisSelectDb(RedisProperties redisProperties){
return redisProperties.db();
final RedisCommandDelegate delegate = RedisCommandDelegate.getInstance();
this.serverAvailable = delegate.initRedisClient(redisProperties);
this.redisCommandDelegate = delegate;
}

@Override
public CollectRep.MetricsData getCurrentMetricsData(@NonNull Long monitorId, @NonNull String metric) {
RedisCommands<String, CollectRep.MetricsData> commands = connection.sync();
commands.select(db);
return commands.hget(String.valueOf(monitorId), metric);
return redisCommandDelegate.operate().hget(String.valueOf(monitorId), metric);
}

@Override
public List<CollectRep.MetricsData> getCurrentMetricsData(@NonNull Long monitorId) {
RedisCommands<String, CollectRep.MetricsData> commands = connection.sync();
commands.select(db);
Map<String, CollectRep.MetricsData> metricsDataMap = commands.hgetall(String.valueOf(monitorId));
Map<String, CollectRep.MetricsData> metricsDataMap = redisCommandDelegate.operate().hgetAll(String.valueOf(monitorId));
return new ArrayList<>(metricsDataMap.values());
}

Expand All @@ -79,47 +64,20 @@ public void saveData(CollectRep.MetricsData metricsData) {
if (metricsData.getCode() != CollectRep.Code.SUCCESS || !isServerAvailable()) {
return;
}

RedisAsyncCommands<String, CollectRep.MetricsData> commands = connection.async();
commands.select(db);
commands.hset(key, hashKey, metricsData).thenAccept(response -> {
if (response) {
log.debug("[warehouse] redis add new data {}:{}.", key, hashKey);
} else {
log.debug("[warehouse] redis replace data {}:{}.", key, hashKey);
}
});
}

private boolean initRedisClient(RedisProperties redisProperties) {
if (redisProperties == null) {
log.error("init error, please config Warehouse redis props in application.yml");
return false;
}
RedisURI.Builder uriBuilder = RedisURI.builder()
.withHost(redisProperties.host())
.withPort(redisProperties.port())
.withTimeout(Duration.of(10, ChronoUnit.SECONDS));
if (redisProperties.password() != null && !"".equals(redisProperties.password())) {
uriBuilder.withPassword(redisProperties.password().toCharArray());
}
try {
redisClient = RedisClient.create(uriBuilder.build());
connection = redisClient.connect(new MetricsDataRedisCodec());
return true;
} catch (Exception e) {
log.error("init redis error {}", e.getMessage(), e);
}
return false;
redisCommandDelegate.operate().hset(key, hashKey, metricsData, future -> {
future.thenAccept(response -> {
if (response) {
log.debug("[warehouse] redis add new data {}:{}.", key, hashKey);
} else {
log.debug("[warehouse] redis replace data {}:{}.", key, hashKey);
}
});
});
}

@Override
public void destroy() {
if (connection != null) {
connection.close();
}
if (redisClient != null) {
redisClient.shutdown();
}
public void destroy() throws Exception {
redisCommandDelegate.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
/**
* Redis configuration information
*/
@ConfigurationProperties(prefix = "warehouse.store.redis")
@ConfigurationProperties(prefix = "warehouse.real-time.redis")
public record RedisProperties(@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1") String host,
@DefaultValue("6379") Integer port,
@DefaultValue("single") String mode,
@DefaultValue("127.0.0.1:6379") String address,
String masterName,
String password,
@DefaultValue("0") Integer db) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.warehouse.store.realtime.redis.client;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.codec.RedisCodec;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.hertzbeat.warehouse.store.realtime.redis.RedisProperties;

/**
* Redis Client Operation
*/
public interface RedisClientOperation<K, V> extends AutoCloseable {
RedisClientOperation<K, V> connect(RedisProperties redisProperties, RedisCodec<K, V> redisCodec);

V hget(K key, K field);

Map<K, V> hgetAll(K key);

void hset(K key, K field, V value, Consumer<RedisFuture<Boolean>> redisFutureConsumer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.warehouse.store.realtime.redis.client;

import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.realtime.redis.MetricsDataRedisCodec;
import org.apache.hertzbeat.warehouse.store.realtime.redis.RedisProperties;
import org.apache.hertzbeat.warehouse.store.realtime.redis.client.impl.RedisClusterClientImpl;
import org.apache.hertzbeat.warehouse.store.realtime.redis.client.impl.RedisSentinelClientImpl;
import org.apache.hertzbeat.warehouse.store.realtime.redis.client.impl.RedisSimpleClientImpl;

/**
* Redis command delegate
*/
@Slf4j
public class RedisCommandDelegate {
private static final String SINGLE_MODE = "single";
private static final String SENTINEL_MODE = "sentinel";
private static final String CLUSTER_MODE = "cluster";
private static final RedisCommandDelegate INSTANCE = new RedisCommandDelegate();
private RedisClientOperation<String, CollectRep.MetricsData> operation;

public static RedisCommandDelegate getInstance() {
return INSTANCE;
}

public RedisClientOperation<String, CollectRep.MetricsData> operate() {
return operation;
}

public void destroy() throws Exception {
operation.close();
}

public boolean initRedisClient(RedisProperties redisProperties) {
if (redisProperties == null) {
log.error("init error, please config Warehouse redis props in application.yml");
return false;
}

try {
operation = switch (redisProperties.mode()) {
case SINGLE_MODE -> new RedisSimpleClientImpl().connect(redisProperties, new MetricsDataRedisCodec());
case SENTINEL_MODE -> new RedisSentinelClientImpl().connect(redisProperties, new MetricsDataRedisCodec());
case CLUSTER_MODE -> new RedisClusterClientImpl().connect(redisProperties, new MetricsDataRedisCodec());
default -> throw new UnsupportedOperationException("Incorrect redis mode: " + redisProperties.mode());
};

return true;
} catch (Exception e) {
log.error("init redis error {}", e.getMessage(), e);
}

return false;
}

private RedisCommandDelegate() {
}
}
Loading

0 comments on commit 712c541

Please sign in to comment.