Skip to content

Commit

Permalink
[Fix][Zeta] Fix env jars not working on zeta (apache#7035)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and hawk9821 committed Jul 13, 2024
1 parent 62dc1a2 commit d7edcb7
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.google.common.annotations.VisibleForTesting;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -106,8 +108,9 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
isStartWithSavePoint);
}

@VisibleForTesting
@Override
protected LogicalDag getLogicalDag() {
public LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse(null);
actions.addAll(immutablePair.getLeft());
// Enable upload connector jar package to engine server, automatically upload connector Jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
Expand All @@ -49,14 +50,14 @@
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
Expand Down Expand Up @@ -406,6 +407,26 @@ public void testGetJobInfo() {
}
}

@Test
public void testJarsInEnvAddedToCommonJars() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test_with_jars.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("client_test_with_jars");
try (SeaTunnelClient seaTunnelClient = createSeaTunnelClient()) {
LogicalDag logicalDag =
seaTunnelClient
.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG)
.getLogicalDag();
Assertions.assertIterableEquals(
Arrays.asList("file:/tmp/test.jar", "file:/tmp/test2.jar"),
logicalDag.getLogicalVertexMap().values().iterator().next().getAction()
.getJarUrls().stream()
.map(URL::toString)
.collect(Collectors.toList()));
}
}

@Test
public void testSavePointAndRestoreWithSavePoint() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
jars = "file:///tmp/test.jar;file:///tmp/test2.jar"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}

FakeSource {
result_table_name = "fake2"
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
LocalFile {
path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type="text"
sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
source_table_name="fake,fake2"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.seatunnel.engine.core.job;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
Expand All @@ -36,7 +35,6 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -68,26 +66,6 @@ public AbstractJobEnvironment(JobConfig jobConfig, boolean isStartWithSavePoint)
this.isStartWithSavePoint = isStartWithSavePoint;
this.idGenerator = new IdGenerator();
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}

protected Set<URL> searchPluginJars() {
Expand Down Expand Up @@ -149,5 +127,5 @@ protected LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator, isStartWithSavePoint);
}

protected abstract LogicalDag getLogicalDag();
public abstract LogicalDag getLogicalDag();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
Expand All @@ -46,6 +47,7 @@
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
Expand All @@ -69,7 +71,9 @@
import scala.Tuple2;

import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -167,6 +171,7 @@ public MultipleTableJobConfigParser(
}

public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoaderService) {
this.fillJobConfigAndCommonJars();
List<? extends Config> sourceConfigs =
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
Expand Down Expand Up @@ -194,7 +199,6 @@ public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoade
try {
Thread.currentThread().setContextClassLoader(classLoader);
ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs);
this.fillJobConfig();
LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> tableWithActionMap =
new LinkedHashMap<>();

Expand Down Expand Up @@ -269,14 +273,34 @@ private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {
});
}

private void fillJobConfig() {
private void fillJobConfigAndCommonJars() {
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
if (StringUtils.isEmpty(jobConfig.getName())
|| jobConfig.getName().equals(Constants.LOGO)
|| jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) {
jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
}
jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
log.info("add common jar in plugins :{}", commonPluginJars);
}

private static <T extends Factory> boolean isFallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.spi.impl.NodeEngineImpl;

Expand Down Expand Up @@ -77,8 +78,9 @@ public Long getJobId() {
return jobId;
}

@VisibleForTesting
@Override
protected LogicalDag getLogicalDag() {
public LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse(seaTunnelServer.getClassLoaderService());
actions.addAll(immutablePair.getLeft());
Expand Down

0 comments on commit d7edcb7

Please sign in to comment.