Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis fine tuning #8

Merged
merged 4 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions vertx/example-configs/config-depl.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
"redisMode": "CLUSTER",
"redisUsername": "xyz",
"redisPassword": "abcd",
"redisMaxPoolSize": 30,
"redisMaxPoolWaiting": 200,
"redisMaxWaitingHandlers": 1024,
"redisPoolRecycleTimeout": 1500,
"redisMaxWaitingHandlers": 5000,
"redisHost":"redis-redis-cluster.redis.svc.cluster.local",
"redisPort": 6379
},
Expand Down
5 changes: 1 addition & 4 deletions vertx/example-configs/config-dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
"redisMode": "CLUSTER",
"redisUsername": "xyz",
"redisPassword": "abcd",
"redisMaxPoolSize": 30,
"redisMaxPoolWaiting": 200,
"redisMaxWaitingHandlers": 1024,
"redisPoolRecycleTimeout": 1500,
"redisMaxWaitingHandlers": 5000,
"redisHost":"redis-redis-cluster.redis.svc.cluster.local",
"redisPort": 6379
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void start() throws Exception {

private Map<String, Object> loadMappingFile() {
try {
String config = new String(Files.readAllBytes(Paths.get("secrets/all-verticles-configs/attribute-mapping.json")),
String config = new String(Files.readAllBytes(Paths.get("secrets/attribute-mapping.json")),
StandardCharsets.UTF_8);
JsonObject json = new JsonObject(config);
return json.getMap();
Expand Down
26 changes: 13 additions & 13 deletions vertx/src/main/java/iudx/ingestion/pipeline/redis/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,23 @@ public RedisClient(Vertx vertx, JsonObject config) {
RedisOptions options;
RedisURI.append("redis://").append(config.getString("redisUsername")).append(":")
.append(config.getString("redisPassword")).append("@").append(config.getString("redisHost"))
.append(":")
.append(config.getInteger("redisPort").toString());
.append(":").append(config.getInteger("redisPort").toString());
String mode = config.getString("redisMode");
if (mode.equals("CLUSTER")) {
options =
new RedisOptions().setType(RedisClientType.CLUSTER).setUseSlave(RedisSlaves.SHARE);
options = new RedisOptions().setType(RedisClientType.CLUSTER).setUseSlave(RedisSlaves.NEVER);
} else if (mode.equals("STANDALONE")) {
options =
new RedisOptions().setType(RedisClientType.STANDALONE);
options = new RedisOptions().setType(RedisClientType.STANDALONE);
} else {
LOGGER.error("Invalid/Unsupported mode");
return;
}
options.setMaxPoolSize(config.getInteger("redisMaxPoolSize"))
.setMaxPoolWaiting(config.getInteger("redisMaxPoolWaiting"))
.setMaxWaitingHandlers(config.getInteger("redisMaxWaitingHandlers"))
.setPoolRecycleTimeout(config.getInteger("redisPoolRecycleTimeout"))
options.setMaxWaitingHandlers(config.getInteger("redisMaxWaitingHandlers"))
.setConnectionString(RedisURI.toString());

ClusteredClient = Redis.createClient(vertx, options);
redis = RedisAPI.api(ClusteredClient);

ClusteredClient.connect(conn -> {
redis = RedisAPI.api(conn.result());
});
}

public Future<JsonObject> get(String key) {
Expand All @@ -59,7 +54,7 @@ public Future<JsonObject> get(String key) {
public Future<JsonObject> get(String key, String path) {
Promise<JsonObject> promise = Promise.promise();
redis.send(JSONGET, key, path).onFailure(res -> {
promise.fail(String.format("JSONGET did not work: %s", res.getCause()));
promise.fail(String.format("JSONGET did not work: %s", res.getMessage()));
}).onSuccess(redisResponse -> {
if (redisResponse == null) {
promise.fail(String.format(" %s key not found", key));
Expand All @@ -83,4 +78,9 @@ public Future<Boolean> put(String key, String path, String data) {
return promise.future();
}

public void close() {
redis.close();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ public RedisServiceImpl(RedisClient redisClient) {

@Override
public RedisService get(String key, String path, Handler<AsyncResult<JsonObject>> handler) {
redisClient.get(key, path)
.onSuccess(successHandler -> {
handler.handle(Future.succeededFuture(successHandler));
}).onFailure(failureHandler -> {
handler.handle(Future.failedFuture(failureHandler));
});
redisClient.get(key, path).onSuccess(successHandler -> {
handler.handle(Future.succeededFuture(successHandler));
}).onFailure(failureHandler -> {
handler.handle(Future.failedFuture(failureHandler));
});
return this;
}

Expand All @@ -56,71 +55,19 @@ public RedisService put(String key, String path, String data, Handler<AsyncResul

JsonObject response = new JsonObject().put("result", "published");

redisClient.get(key, pathParam.toString()).onComplete(redisHandler -> {
if (redisHandler.succeeded()) {
// key found
LOGGER.debug("key found : ");
JsonObject fromRedis = redisHandler.result();
LOGGER.debug("data : " + fromRedis.toString());
if (isValidMessage2Push(fromRedis, new JsonObject(data))) {

redisClient.put(key, pathParam.toString(), data).onComplete(res -> {
if (res.failed()) {
LOGGER.error(res.cause());
} else {
handler.handle(Future.succeededFuture(response));
}
});
} else {
LOGGER.info("message rejected for being older than json in redis.");
handler.handle(Future.failedFuture("message rejected for being older than json in redis."));
}
} else {
LOGGER.debug("key not found : " + key);
redisClient.put(key, pathParam.toString(), data).onComplete(res -> {
if (res.failed()) {
LOGGER.error(res.cause());
handler.handle(Future.failedFuture("failed to publish message."));
} else {
handler.handle(Future.succeededFuture(response));
}
});
}
});

} else {
handler.handle(Future.failedFuture("null/empty message rejected."));
}
return this;
}


/**
* check if message is valid through 'ObservationDateTime' field's in both messages.
*
* @param fromRedis Json from Redis Cache.
* @param latestJson Json from EB.
* @return
*/
private boolean isValidMessage2Push(JsonObject fromRedis, JsonObject latestJson) {
String dateFromRedisData = fromRedis.getString("observationDateTime");
String dateFromLatestData = latestJson.getString("observationDateTime");
boolean result = false;
LOGGER.debug("from Redis : " + dateFromRedisData + " from Latest : " + dateFromLatestData);
try {
LocalDateTime fromRedisData = LocalDateTime.parse(dateFromRedisData);
LocalDateTime fromLatestData = LocalDateTime.parse(dateFromLatestData);

if (fromLatestData.isAfter(fromRedisData)) {
LOGGER.info(result);
result = true;
}
} catch (DateTimeParseException e) {
LOGGER.error("parse exception : " + e.getMessage());
result = false;
}
return result;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ public class RedisVerticle extends AbstractVerticle {
private ServiceBinder binder;
private MessageConsumer<JsonObject> consumer;

private RedisClient client;

@Override
public void start() throws Exception {
RedisClient client = new RedisClient(vertx, config());
client = new RedisClient(vertx, config());

redisService = new RedisServiceImpl(client);
binder = new ServiceBinder(vertx);
Expand All @@ -28,4 +29,7 @@ public void start() throws Exception {

}

public void stop() {
client.close();
}
}