From 0866e69e4ba4fd43fb95882642a6d8159ef11db0 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Mon, 26 Jun 2023 18:24:13 +0800 Subject: [PATCH 1/5] Support redis to set expiration time. --- docs/en/connector-v2/sink/Redis.md | 5 ++++ .../seatunnel/redis/config/RedisConfig.java | 6 +++++ .../seatunnel/redis/config/RedisDataType.java | 23 ++++++++++++++----- .../redis/config/RedisParameters.java | 4 ++++ .../redis/sink/RedisSinkFactory.java | 3 ++- .../seatunnel/redis/sink/RedisSinkWriter.java | 3 ++- 6 files changed, 36 insertions(+), 8 deletions(-) diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index 9f9215d1b48..0f744db0ec0 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -23,6 +23,7 @@ Used to write data to Redis. | mode | string | no | single | | nodes | list | yes when mode=cluster | - | | format | string | no | json | +| expire | long | no | - | | common-options | | no | - | ### host [string] @@ -120,6 +121,10 @@ Connector will generate data as the following and write it to redis: ``` +### expire [long] + +Set redis expiration time, the unit is second, not set by default. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index c777d237827..72e31433ad3 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -102,6 +102,12 @@ public enum HashKeyParseMode { .withDescription( "hash key parse mode, support all or kv, default value is all"); + public static final Option EXPIRE = + Options.key("expire") + .longType() + .noDefaultValue() + .withDescription("Set redis expiration time."); + public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java index 64772b5381d..a315e0cdae0 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java @@ -30,8 +30,9 @@ public enum RedisDataType { KEY { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.set(key, value); + expire(jedis, key, expire); } @Override @@ -41,9 +42,10 @@ public List get(Jedis jedis, String key) { }, HASH { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { Map fieldsMap = JsonUtils.toMap(value); jedis.hset(key, fieldsMap); + expire(jedis, key, expire); } @Override @@ -54,8 +56,9 @@ public List get(Jedis jedis, String key) { }, LIST { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.lpush(key, value); + expire(jedis, key, expire); } @Override @@ -65,8 +68,9 @@ public List get(Jedis jedis, String key) { }, SET { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.sadd(key, value); + expire(jedis, key, expire); } @Override @@ -77,8 +81,9 @@ public List get(Jedis jedis, String key) { }, ZSET { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.zadd(key, 1, value); + expire(jedis, key, expire); } @Override @@ -91,7 +96,13 @@ public List get(Jedis jedis, String key) { return Collections.emptyList(); } - public void set(Jedis jedis, String key, String value) { + private static void expire(Jedis jedis, String key, long expire) { + if (expire > 0) { + jedis.expire(key, expire); + } + } + + public void set(Jedis jedis, String key, String value, long expire) { // do nothing } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index c8bb879d0f5..8d3e778ec9c 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -47,6 +47,7 @@ public class RedisParameters implements Serializable { private RedisConfig.RedisMode mode; private RedisConfig.HashKeyParseMode hashKeyParseMode; private List redisNodes = Collections.emptyList(); + private long expire = 0; public void buildWithConfig(Config config) { // set host @@ -89,6 +90,9 @@ public void buildWithConfig(Config config) { if (config.hasPath(RedisConfig.KEY_PATTERN.key())) { this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key()); } + if (config.hasPath(RedisConfig.EXPIRE.key())) { + this.expire = config.getLong(RedisConfig.EXPIRE.key()); + } // set redis data type try { String dataType = config.getString(RedisConfig.DATA_TYPE.key()); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java index e68a893f79c..22ae1568740 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java @@ -41,7 +41,8 @@ public OptionRule optionRule() { RedisConfig.AUTH, RedisConfig.USER, RedisConfig.KEY_PATTERN, - RedisConfig.FORMAT) + RedisConfig.FORMAT, + RedisConfig.EXPIRE) .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) .build(); } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 657e3aaa565..80b1449b9d6 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -59,7 +59,8 @@ public void write(SeaTunnelRow element) throws IOException { } else { key = keyField; } - redisDataType.set(jedis, key, data); + long expire = redisParameters.getExpire(); + redisDataType.set(jedis, key, data, expire); } @Override From 3f109475705cd36796ff13f16a513a43c00638db Mon Sep 17 00:00:00 2001 From: lightzhao Date: Wed, 28 Jun 2023 11:36:09 +0800 Subject: [PATCH 2/5] Set redis expire default value. --- docs/en/connector-v2/sink/Redis.md | 4 ++-- .../connectors/seatunnel/redis/config/RedisConfig.java | 2 +- .../connectors/seatunnel/redis/config/RedisParameters.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index 0f744db0ec0..8243ee3e8fc 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -23,7 +23,7 @@ Used to write data to Redis. | mode | string | no | single | | nodes | list | yes when mode=cluster | - | | format | string | no | json | -| expire | long | no | - | +| expire | long | no | -1 | | common-options | | no | - | ### host [string] @@ -123,7 +123,7 @@ Connector will generate data as the following and write it to redis: ### expire [long] -Set redis expiration time, the unit is second, not set by default. +Set redis expiration time, the unit is second. The default value is -1, keys do not automatically expire by default. ### common options diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index 72e31433ad3..511cbe4aa99 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -105,7 +105,7 @@ public enum HashKeyParseMode { public static final Option EXPIRE = Options.key("expire") .longType() - .noDefaultValue() + .defaultValue(-1L) .withDescription("Set redis expiration time."); public enum Format { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index 8d3e778ec9c..8954b4da2a1 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -47,7 +47,7 @@ public class RedisParameters implements Serializable { private RedisConfig.RedisMode mode; private RedisConfig.HashKeyParseMode hashKeyParseMode; private List redisNodes = Collections.emptyList(); - private long expire = 0; + private long expire = RedisConfig.EXPIRE.defaultValue(); public void buildWithConfig(Config config) { // set host From ca138b485c27f2f541146fd87b659922253eec10 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Mon, 17 Jul 2023 14:24:52 +0800 Subject: [PATCH 3/5] add e2e test. --- .../e2e/connector/redis/RedisIT.java | 11 ++++ .../src/test/resources/redis-to-redis-1.conf | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 808f6860337..0ba5f71eeee 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -192,4 +192,15 @@ public void testRedis(TestContainer container) throws IOException, InterruptedEx jedis.del("key_list"); Assertions.assertEquals(0, jedis.llen("key_list")); } + + @TestTemplate + public void testRedisWithExpire(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis-1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(100, jedis.llen("key_list")); + // Clear data to prevent data duplication in the next TestContainer + jedis.del("key_list"); + Assertions.assertEquals(0, jedis.llen("key_list")); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf new file mode 100644 index 00000000000..0c24c41a711 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = key + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "key_list" + data_type = list + expire = 300 + } +} \ No newline at end of file From ec3f821e5cd45f27d6a71ec2d1ad5afcc46d6330 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Mon, 17 Jul 2023 14:40:43 +0800 Subject: [PATCH 4/5] add e2e test. --- .../java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java | 2 +- .../src/test/resources/redis-to-redis-1.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 0ba5f71eeee..962d3e2c57b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -200,7 +200,7 @@ public void testRedisWithExpire(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode()); Assertions.assertEquals(100, jedis.llen("key_list")); // Clear data to prevent data duplication in the next TestContainer - jedis.del("key_list"); + Thread.sleep(60 * 1000); Assertions.assertEquals(0, jedis.llen("key_list")); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf index 0c24c41a711..4a42bd3a46a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf @@ -45,6 +45,6 @@ sink { auth = "U2VhVHVubmVs" key = "key_list" data_type = list - expire = 300 + expire = 30 } } \ No newline at end of file From f9e6381f65d46dcd3ee21df90d9ea7112565f130 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Wed, 19 Jul 2023 18:42:08 +0800 Subject: [PATCH 5/5] modify config file name. --- .../java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java | 2 +- .../{redis-to-redis-1.conf => redis-to-redis-expire.conf} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/{redis-to-redis-1.conf => redis-to-redis-expire.conf} (100%) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 962d3e2c57b..bd4a9063ba1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -196,7 +196,7 @@ public void testRedis(TestContainer container) throws IOException, InterruptedEx @TestTemplate public void testRedisWithExpire(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/redis-to-redis-1.conf"); + Container.ExecResult execResult = container.executeJob("/redis-to-redis-expire.conf"); Assertions.assertEquals(0, execResult.getExitCode()); Assertions.assertEquals(100, jedis.llen("key_list")); // Clear data to prevent data duplication in the next TestContainer diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf similarity index 100% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-1.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf