From a5f0e90ace94971cb34febb407f474526a658dba Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Thu, 25 May 2023 14:08:55 +0800 Subject: [PATCH 1/8] [Bug] connector-influxdb module InfluxDBSource class's initColumnsIndex method, sql invalide when sql contains tz function cause by direct append QUERY_LIMIT #4231 --- .../influxdb/source/InfluxDBSource.java | 23 ++++++- .../e2e/connector/influxdb/InfluxdbIT.java | 32 ++++++++++ .../influxdb-to-influxdb-sqltest.conf | 60 +++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-sqltest.conf diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java index 14cae9a77a3..aab7610fd95 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java @@ -50,6 +50,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL; @@ -127,7 +129,16 @@ public SourceSplitEnumerator restoreEn private List initColumnsIndex(InfluxDB influxdb) { // query one row to get column info - String query = sourceConfig.getSql() + QUERY_LIMIT; + String sql = sourceConfig.getSql(); + String query = sql + QUERY_LIMIT; + // if sql contains tz(), can't be append QUERY_LIMIT at last . see bug #4231 + int start = containTzFunction(sql.toLowerCase()); + if (start > 0) { + StringBuilder tmpSql = new StringBuilder(sql); + tmpSql.insert(start - 1, QUERY_LIMIT).append(" "); + query = tmpSql.toString(); + } + try { QueryResult queryResult = influxdb.query(new Query(query, sourceConfig.getDatabase())); @@ -144,4 +155,14 @@ private List initColumnsIndex(InfluxDB influxdb) { e); } } + + private static int containTzFunction(String sql) { + Pattern pattern = Pattern.compile("tz\\(.*\\)"); + Matcher matcher = pattern.matcher(sql); + if (matcher.find()) { + int start = matcher.start(); + return start; + } + return -1; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index d6667966220..1bf28e91763 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -207,6 +207,38 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte } } } + + @TestTemplate + public void testInfluxdbSqlTest(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb-sqltest.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + String sourceSql = + String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); + String sinkSql = String.format("select * from %s order by time", INFLUXDB_SINK_MEASUREMENT); + QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE)); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + // assert data count + Assertions.assertEquals( + sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); + // assert data values + List> sourceValues = + sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sinkValues = + sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); + int rowSize = sourceValues.size(); + int colSize = sourceValues.get(0).size(); + + for (int row = 0; row < rowSize; row++) { + for (int col = 0; col < colSize; col++) { + Object sourceColValue = sourceValues.get(row).get(col); + Object sinkColValue = sinkValues.get(row).get(col); + + if (!Objects.deepEquals(sourceColValue, sinkColValue)) { + Assertions.assertEquals(sourceColValue, sinkColValue); + } + } + } + } private void initializeInfluxDBClient() throws ConnectException { InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-sqltest.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-sqltest.conf new file mode 100644 index 00000000000..1fb505c96cf --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-sqltest.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source tz('Asia/Shanghai')" + database = "test" + upper_bound = 99 + lower_bound = 0 + partition_num = 4 + split_column = "c_int" + schema { + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + } +} + +transform { +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } +} \ No newline at end of file From f6db09e13aa6368d274a936d1d5ddcd36522c210 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Mon, 7 Aug 2023 09:08:32 +0800 Subject: [PATCH 2/8] format code style --- .../seatunnel/e2e/connector/influxdb/InfluxdbIT.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 1bf28e91763..b7366983bdc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -207,10 +207,12 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte } } } - + @TestTemplate - public void testInfluxdbSqlTest(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb-sqltest.conf"); + public void testInfluxdbSqlTest(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/influxdb-to-influxdb-sqltest.conf"); Assertions.assertEquals(0, execResult.getExitCode()); String sourceSql = String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); From 049c9170b17012c066e4c46ff3eb2997f2093408 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Tue, 8 Aug 2023 13:55:53 +0800 Subject: [PATCH 3/8] Standardize Test case method and conf file --- .../org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java | 2 +- ...-influxdb-sqltest.conf => influxdb-to-influxdb-with-tz.conf} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/{influxdb-to-influxdb-sqltest.conf => influxdb-to-influxdb-with-tz.conf} (100%) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index b7366983bdc..5d90c774c6b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -209,7 +209,7 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte } @TestTemplate - public void testInfluxdbSqlTest(TestContainer container) + public void testInfluxdbWithTz(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb-sqltest.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-sqltest.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf similarity index 100% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-sqltest.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf From d7e6dbfd12ecf10f8a3312d5f7c361e1c2167784 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Tue, 8 Aug 2023 14:00:09 +0800 Subject: [PATCH 4/8] Standardize Test case method and conf file --- .../org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 5d90c774c6b..b7760d8be3f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -212,7 +212,7 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte public void testInfluxdbWithTz(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob("/influxdb-to-influxdb-sqltest.conf"); + container.executeJob("/influxdb-to-influxdb-with-tz.conf.conf"); Assertions.assertEquals(0, execResult.getExitCode()); String sourceSql = String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); From cf001320cd9a91038af75a213e2d932ff0d68304 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Tue, 8 Aug 2023 14:03:21 +0800 Subject: [PATCH 5/8] Standardize Test case method and conf file --- .../org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index b7760d8be3f..ddc7afadacb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -212,7 +212,7 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte public void testInfluxdbWithTz(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob("/influxdb-to-influxdb-with-tz.conf.conf"); + container.executeJob("/influxdb-to-influxdb-with-tz.conf"); Assertions.assertEquals(0, execResult.getExitCode()); String sourceSql = String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); From d873598cf1adc0361b846dc683c39b56c53ce691 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Fri, 27 Oct 2023 10:34:32 +0800 Subject: [PATCH 6/8] format influxdb-to-influxdb-with-tz.conf --- .../influxdb-to-influxdb-with-tz.conf | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf index 1fb505c96cf..204e069016a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf @@ -16,45 +16,45 @@ # env { - execution.parallelism = 1 - job.mode = "BATCH" + execution.parallelism = 1 + job.mode = "BATCH" } source { - InfluxDB { - url = "http://influxdb-host:8086" - sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source tz('Asia/Shanghai')" - database = "test" - upper_bound = 99 - lower_bound = 0 - partition_num = 4 - split_column = "c_int" - schema { - fields { - label = STRING - c_string = STRING - c_double = DOUBLE - c_bigint = BIGINT - c_float = FLOAT - c_int = INT - c_smallint = SMALLINT - c_boolean = BOOLEAN - time = BIGINT - } - } + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source tz('Asia/Shanghai')" + database = "test" + upper_bound = 99 + lower_bound = 0 + partition_num = 4 + split_column = "c_int" + schema { + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } } + } } transform { } sink { - InfluxDB { - url = "http://influxdb-host:8086" - database = "test" - measurement = "sink" - key_time = "time" - key_tags = ["label"] - batch_size = 1 - } + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } } \ No newline at end of file From 52b2613d5f7792f0cf04feaf3f7a542d55b0a731 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Fri, 27 Oct 2023 15:21:44 +0800 Subject: [PATCH 7/8] change test conf partition_num to 0 --- .../src/test/resources/influxdb-to-influxdb-with-tz.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf index 204e069016a..2b57639f35a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf @@ -27,7 +27,7 @@ source { database = "test" upper_bound = 99 lower_bound = 0 - partition_num = 4 + partition_num = 0 split_column = "c_int" schema { fields { From 177a57e793eb111bf8ba831eeae10a0cd6ac1e64 Mon Sep 17 00:00:00 2001 From: zhengyuan Date: Fri, 27 Oct 2023 17:01:58 +0800 Subject: [PATCH 8/8] delete partition conf , shuld not test mult partition --- .../src/test/resources/influxdb-to-influxdb-with-tz.conf | 4 ---- 1 file changed, 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf index 2b57639f35a..4b7666130da 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf @@ -25,10 +25,6 @@ source { url = "http://influxdb-host:8086" sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source tz('Asia/Shanghai')" database = "test" - upper_bound = 99 - lower_bound = 0 - partition_num = 0 - split_column = "c_int" schema { fields { label = STRING