Skip to content

Commit

Permalink
[Bug][Engine] Fix SeaTunnel Engine Local Mode Can't Deserialize Split (
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Dec 28, 2022
1 parent d0c6217 commit 52c6bf2
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
Expand Down Expand Up @@ -108,6 +109,7 @@ public JobConfigParser(@NonNull String jobDefineFilePath,
}

public ImmutablePair<List<Action>, Set<URL>> parse() {
Thread.currentThread().setContextClassLoader(new SeatunnelChildFirstClassLoader(new ArrayList<>()));
List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
List<? extends Config> transformConfigs =
TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "transform", Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
* Add jar url to classloader. The different engine should have different logic to add url into
* their own classloader
*/
private static final BiConsumer DEFAULT_URL_TO_CLASSLOADER = (classLoader, url) -> {
private static final BiConsumer<ClassLoader, URL> DEFAULT_URL_TO_CLASSLOADER = (classLoader, url) -> {
if (classLoader instanceof URLClassLoader) {
ReflectionUtils.invoke(classLoader, "addURL", url);
} else {
Expand Down Expand Up @@ -210,7 +210,7 @@ public Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> getAllPlugin
if (this.pluginDir.toFile().exists()) {
log.info("load plugin from plugin dir: {}", this.pluginDir);
List<URL> files = FileUtils.searchJarFiles(this.pluginDir);
factories = FactoryUtil.discoverFactories(new URLClassLoader((URL[]) files.toArray(new URL[0])));
factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
} else {
log.info("plugin dir: {} not exists, load plugin from classpath", this.pluginDir);
factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
Expand All @@ -220,9 +220,7 @@ public Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> getAllPlugin

factories.forEach(plugin -> {
if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
if (plugins.get(PluginType.SOURCE) == null) {
plugins.put(PluginType.SOURCE, new LinkedHashMap());
}
plugins.computeIfAbsent(PluginType.SOURCE, k -> new LinkedHashMap<>());

plugins.get(PluginType.SOURCE).put(PluginIdentifier.of(
"seatunnel",
Expand All @@ -234,9 +232,7 @@ public Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> getAllPlugin
}

if (TableSinkFactory.class.isAssignableFrom(plugin.getClass())) {
if (plugins.get(PluginType.SINK) == null) {
plugins.put(PluginType.SINK, new LinkedHashMap());
}
plugins.computeIfAbsent(PluginType.SINK, k -> new LinkedHashMap<>());

plugins.get(PluginType.SINK).put(PluginIdentifier.of(
"seatunnel",
Expand All @@ -248,9 +244,7 @@ public Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> getAllPlugin
}

if (TableTransformFactory.class.isAssignableFrom(plugin.getClass())) {
if (plugins.get(PluginType.TRANSFORM) == null) {
plugins.put(PluginType.TRANSFORM, new LinkedHashMap());
}
plugins.computeIfAbsent(PluginType.TRANSFORM, k -> new LinkedHashMap<>());

plugins.get(PluginType.TRANSFORM).put(PluginIdentifier.of(
"seatunnel",
Expand Down

0 comments on commit 52c6bf2

Please sign in to comment.