diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java index 6e333543512..c895794e4c6 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java @@ -30,6 +30,8 @@ import org.apache.commons.lang3.tuple.ImmutablePair; +import com.google.common.annotations.VisibleForTesting; + import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -106,8 +108,9 @@ protected MultipleTableJobConfigParser getJobConfigParser() { isStartWithSavePoint); } + @VisibleForTesting @Override - protected LogicalDag getLogicalDag() { + public LogicalDag getLogicalDag() { ImmutablePair, Set> immutablePair = getJobConfigParser().parse(null); actions.addAll(immutablePair.getLeft()); // Enable upload connector jar package to engine server, automatically upload connector Jar diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index f16f61a7f07..25eaf89a6cb 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; @@ -47,10 +48,13 @@ import com.hazelcast.instance.impl.HazelcastInstanceFactory; import lombok.extern.slf4j.Slf4j; +import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; @@ -399,6 +403,26 @@ public void testGetJobInfo() { } } + @Test + public void testJarsInEnvAddedToCommonJars() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/client_test_with_jars.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("client_test_with_jars"); + try (SeaTunnelClient seaTunnelClient = createSeaTunnelClient()) { + LogicalDag logicalDag = + seaTunnelClient + .createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG) + .getLogicalDag(); + Assertions.assertIterableEquals( + Arrays.asList("file:/tmp/test.jar", "file:/tmp/test2.jar"), + logicalDag.getLogicalVertexMap().values().iterator().next().getAction() + .getJarUrls().stream() + .map(URL::toString) + .collect(Collectors.toList())); + } + } + @Test public void testSavePointAndRestoreWithSavePoint() throws Exception { Common.setDeployMode(DeployMode.CLIENT); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf new file mode 100644 index 00000000000..77d6d5db852 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + jars = "file:///tmp/test.jar;file:///tmp/test2.jar" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } + + FakeSource { + result_table_name = "fake2" + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + LocalFile { + path="/tmp/hive/warehouse/test2" + field_delimiter="\t" + row_delimiter="\n" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format_type="text" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error", + source_table_name="fake,fake2" + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java index 49c9b9275da..28c6b4f0126 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.engine.core.job; -import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.common.config.JobConfig; @@ -36,7 +35,6 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -68,26 +66,6 @@ public AbstractJobEnvironment(JobConfig jobConfig, boolean isStartWithSavePoint) this.isStartWithSavePoint = isStartWithSavePoint; this.idGenerator = new IdGenerator(); this.commonPluginJars.addAll(searchPluginJars()); - this.commonPluginJars.addAll( - new ArrayList<>( - Common.getThirdPartyJars( - jobConfig - .getEnvOptions() - .getOrDefault(EnvCommonOptions.JARS.key(), "") - .toString()) - .stream() - .map(Path::toUri) - .map( - uri -> { - try { - return uri.toURL(); - } catch (MalformedURLException e) { - throw new SeaTunnelEngineException( - "the uri of jar illegal:" + uri, e); - } - }) - .collect(Collectors.toList()))); - LOGGER.info("add common jar in plugins :" + commonPluginJars); } protected Set searchPluginJars() { @@ -149,5 +127,5 @@ protected LogicalDagGenerator getLogicalDagGenerator() { return new LogicalDagGenerator(actions, jobConfig, idGenerator, isStartWithSavePoint); } - protected abstract LogicalDag getLogicalDag(); + public abstract LogicalDag getLogicalDag(); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 172bbbff5f5..c3e8e414b2a 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -38,6 +38,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.TypesafeConfigUtils; import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; @@ -46,6 +47,7 @@ import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.exception.JobDefineCheckException; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; import org.apache.seatunnel.engine.common.utils.IdGenerator; import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; @@ -69,7 +71,9 @@ import scala.Tuple2; import java.io.Serializable; +import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; @@ -167,6 +171,7 @@ public MultipleTableJobConfigParser( } public ImmutablePair, Set> parse(ClassLoaderService classLoaderService) { + this.fillJobConfigAndCommonJars(); List sourceConfigs = TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "source", Collections.emptyList()); @@ -194,7 +199,6 @@ public ImmutablePair, Set> parse(ClassLoaderService classLoade try { Thread.currentThread().setContextClassLoader(classLoader); ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs); - this.fillJobConfig(); LinkedHashMap>> tableWithActionMap = new LinkedHashMap<>(); @@ -269,7 +273,7 @@ private void fillUsedFactoryUrls(List actions, Set result) { }); } - private void fillJobConfig() { + private void fillJobConfigAndCommonJars() { jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE)); if (StringUtils.isEmpty(jobConfig.getName()) || jobConfig.getName().equals(Constants.LOGO) @@ -277,6 +281,26 @@ private void fillJobConfig() { jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME)); } jobConfig.getEnvOptions().putAll(envOptions.getSourceMap()); + this.commonPluginJars.addAll( + new ArrayList<>( + Common.getThirdPartyJars( + jobConfig + .getEnvOptions() + .getOrDefault(EnvCommonOptions.JARS.key(), "") + .toString()) + .stream() + .map(Path::toUri) + .map( + uri -> { + try { + return uri.toURL(); + } catch (MalformedURLException e) { + throw new SeaTunnelEngineException( + "the uri of jar illegal:" + uri, e); + } + }) + .collect(Collectors.toList()))); + log.info("add common jar in plugins :{}", commonPluginJars); } private static boolean isFallback( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java index d13f1a49d8d..e7a1c5108af 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; +import com.google.common.annotations.VisibleForTesting; import com.hazelcast.instance.impl.Node; import com.hazelcast.spi.impl.NodeEngineImpl; @@ -77,8 +78,9 @@ public Long getJobId() { return jobId; } + @VisibleForTesting @Override - protected LogicalDag getLogicalDag() { + public LogicalDag getLogicalDag() { ImmutablePair, Set> immutablePair = getJobConfigParser().parse(seaTunnelServer.getClassLoaderService()); actions.addAll(immutablePair.getLeft());