diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java index e79d44e4dae9..d96aae430742 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java @@ -36,46 +36,78 @@ @NotThreadSafe public class TaskOutputParameterParser { - private final Map taskOutputParams = new HashMap<>(); + // Used to avoid '${setValue(' which loss the end of ')}' + private final int maxOneParameterRows; + + // Used to avoid '${setValue(' which length is too long, this may case OOM + private final int maxOneParameterLength; + + private final Map taskOutputParams; private List currentTaskOutputParam; - public void appendParseLog(String log) { - if (log == null) { + private long currentTaskOutputParamLength; + + public TaskOutputParameterParser() { + // the default max rows of one parameter is 1024, this should be enough + this(1024, Integer.MAX_VALUE); + } + + public TaskOutputParameterParser(int maxOneParameterRows, int maxOneParameterLength) { + this.maxOneParameterRows = maxOneParameterRows; + this.maxOneParameterLength = maxOneParameterLength; + this.taskOutputParams = new HashMap<>(); + this.currentTaskOutputParam = null; + this.currentTaskOutputParamLength = 0; + } + + public void appendParseLog(String logLine) { + if (logLine == null) { return; } if (currentTaskOutputParam != null) { + if (currentTaskOutputParam.size() > maxOneParameterRows + || currentTaskOutputParamLength > maxOneParameterLength) { + log.warn( + "The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param", + String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows); + currentTaskOutputParam = null; + currentTaskOutputParamLength = 0; + return; + } // continue to parse the rest of line - int i = log.indexOf(")}"); + int i = logLine.indexOf(")}"); if (i == -1) { // the end of var pool not found - currentTaskOutputParam.add(log); + currentTaskOutputParam.add(logLine); + currentTaskOutputParamLength += logLine.length(); } else { // the end of var pool found - currentTaskOutputParam.add(log.substring(0, i + 2)); + currentTaskOutputParam.add(logLine.substring(0, i + 2)); Pair keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam)); if (keyValue.getKey() != null && keyValue.getValue() != null) { taskOutputParams.put(keyValue.getKey(), keyValue.getValue()); } currentTaskOutputParam = null; + currentTaskOutputParamLength = 0; // continue to parse the rest of line - if (i + 2 != log.length()) { - appendParseLog(log.substring(i + 2)); + if (i + 2 != logLine.length()) { + appendParseLog(logLine.substring(i + 2)); } } return; } - int indexOfVarPoolBegin = log.indexOf("${setValue("); + int indexOfVarPoolBegin = logLine.indexOf("${setValue("); if (indexOfVarPoolBegin == -1) { - indexOfVarPoolBegin = log.indexOf("#{setValue("); + indexOfVarPoolBegin = logLine.indexOf("#{setValue("); } if (indexOfVarPoolBegin == -1) { return; } currentTaskOutputParam = new ArrayList<>(); - appendParseLog(log.substring(indexOfVarPoolBegin)); + appendParseLog(logLine.substring(indexOfVarPoolBegin)); } public Map getTaskOutputParams() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java index 950ed822c0dd..25c1b84ae179 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java @@ -19,14 +19,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import lombok.SneakyThrows; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -35,7 +36,7 @@ class TaskOutputParameterParserTest { @Test - void testEmptyLog() throws IOException, URISyntaxException { + void testEmptyLog() { List varPools = getLogs("/outputParam/emptyVarPoolLog.txt"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); varPools.forEach(taskOutputParameterParser::appendParseLog); @@ -43,7 +44,7 @@ void testEmptyLog() throws IOException, URISyntaxException { } @Test - void testOneLineLog() throws IOException, URISyntaxException { + void testOneLineLog() { List varPools = getLogs("/outputParam/onelineVarPoolLog.txt"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); varPools.forEach(taskOutputParameterParser::appendParseLog); @@ -51,7 +52,7 @@ void testOneLineLog() throws IOException, URISyntaxException { } @Test - void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException { + void testOneVarPoolInMultiLineLog() { List varPools = getLogs("/outputParam/oneVarPollInMultiLineLog.txt"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); varPools.forEach(taskOutputParameterParser::appendParseLog); @@ -63,14 +64,46 @@ void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException { } @Test - void testVarPollInMultiLineLog() throws IOException, URISyntaxException { - List varPools = getLogs("/outputParam/multipleVarPoll.txt"); + void testVarPoolInMultiLineLog() { + List varPools = getLogs("/outputParam/multipleVarPool.txt"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); varPools.forEach(taskOutputParameterParser::appendParseLog); assertEquals(ImmutableMap.of("name", "tom", "age", "1"), taskOutputParameterParser.getTaskOutputParams()); } - private List getLogs(String file) throws IOException, URISyntaxException { + @Test + void textVarPoolExceedMaxRows() { + List varPools = getLogs("/outputParam/maxRowsVarPool.txt"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(2, Integer.MAX_VALUE); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(Collections.emptyMap(), taskOutputParameterParser.getTaskOutputParams()); + + taskOutputParameterParser = new TaskOutputParameterParser(); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(ImmutableMap.of("name", "name=tom\n" + + "name=name=tom\n" + + "name=name=tom\n" + + "name=name=tom\n" + + "name=name=tom"), taskOutputParameterParser.getTaskOutputParams()); + + } + + @Test + void textVarPoolExceedMaxLength() { + List varPools = getLogs("/outputParam/maxLengthVarPool.txt"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(2, 10); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(Collections.emptyMap(), taskOutputParameterParser.getTaskOutputParams()); + + taskOutputParameterParser = new TaskOutputParameterParser(); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(ImmutableMap.of("name", "123456789\n" + + "12345\n"), taskOutputParameterParser.getTaskOutputParams()); + + } + + @SneakyThrows + private List getLogs(String file) { URI uri = TaskOutputParameterParserTest.class.getResource(file).toURI(); return Files.lines(Paths.get(uri)).collect(Collectors.toList()); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt new file mode 100644 index 000000000000..6aebb51a1dbe --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +${setValue(name=123456789 +12345 +)} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt new file mode 100644 index 000000000000..0fea43da8d31 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +${setValue(name=name=tom +name=name=tom +name=name=tom +name=name=tom +name=name=tom)} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPool.txt similarity index 100% rename from dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt rename to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPool.txt