Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][all] change Log to @Slf4j #3001

Merged
merged 7 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -46,19 +47,16 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Slf4j
public class FlinkEnvironment implements RuntimeEnv {

private static final Logger LOGGER = LoggerFactory.getLogger(FlinkEnvironment.class);

private Config config;

private StreamExecutionEnvironment environment;
Expand Down Expand Up @@ -127,7 +125,7 @@ public JobMode getJobMode() {

@Override
public void registerPlugin(List<URL> pluginPaths) {
pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
pluginPaths.forEach(url -> log.info("register plugins : {}", url));
List<Configuration> configurations = new ArrayList<>();
try {
configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
Expand Down Expand Up @@ -248,7 +246,7 @@ private void setTimeCharacteristic() {
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
break;
default:
LOGGER.warn(
log.warn(
"set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
timeType);
break;
Expand All @@ -272,7 +270,7 @@ private void setCheckpoint() {
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
break;
default:
LOGGER.warn(
log.warn(
"set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
mode);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,20 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Slf4j
public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBatchTransform, FlinkBatchSink, FlinkEnvironment> {

private static final Logger LOGGER = LoggerFactory.getLogger(FlinkBatchExecution.class);

private Config config;

private final FlinkEnvironment flinkEnvironment;
Expand Down Expand Up @@ -73,11 +71,11 @@ public void start(List<FlinkBatchSource> sources, List<FlinkBatchTransform> tran

if (whetherExecute(sinks)) {
try {
LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
log.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
LOGGER.info(execute.toString());
log.info(execute.toString());
} catch (Exception e) {
LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,19 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Slf4j
public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkStreamTransform, FlinkStreamSink, FlinkEnvironment> {

private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStreamExecution.class);

private Config config;

private final FlinkEnvironment flinkEnvironment;
Expand Down Expand Up @@ -71,10 +69,10 @@ public void start(List<FlinkStreamSource> sources, List<FlinkStreamTransform> tr
sink.outputStream(flinkEnvironment, stream);
}
try {
LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
} catch (Exception e) {
LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

@Slf4j
public final class EnvironmentUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentUtil.class);

private EnvironmentUtil() {
}

Expand All @@ -58,11 +56,11 @@ public static void setRestartStrategy(Config config, ExecutionConfig executionCo
Time.of(delayInterval, TimeUnit.MILLISECONDS)));
break;
default:
LOGGER.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy);
log.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy);
}
}
} catch (Exception e) {
LOGGER.warn("set restart.strategy in config '{}' exception", config, e);
log.warn("set restart.strategy in config '{}' exception", config, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,20 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.List;

@Slf4j
public class SparkEnvironment implements RuntimeEnv {

private static final Logger LOGGER = LoggerFactory.getLogger(SparkEnvironment.class);

private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;

private SparkConf sparkConf;
Expand Down Expand Up @@ -92,7 +90,7 @@ public CheckResult checkConfig() {

@Override
public void registerPlugin(List<URL> pluginPaths) {
LOGGER.info("register plugins :" + pluginPaths);
log.info("register plugins :" + pluginPaths);
// TODO we use --jar parameter to support submit multi-jar in spark cluster at now. Refactor it to
// support submit multi-jar in code or remove this logic.
// this.sparkSession.conf().set("spark.jars",pluginPaths.stream().map(URL::getPath).collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@

import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.PreparedStatement;
Expand All @@ -54,10 +53,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Slf4j
public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSinkWriter.class);

private final Context context;
private final ReaderOption option;
private final ShardRouter shardRouter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
Expand All @@ -55,8 +54,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file";
private static final int UUID_LENGTH = 10;
private final FileReaderOption readerOption;
Expand Down Expand Up @@ -178,7 +177,7 @@ private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) throw
uuid));
command.add("--path");
command.add("\"" + clickhouseLocalFile + "\"");
LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
log.info("Generate clickhouse local file command: {}", String.join(" ", command));
ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
Process start = processBuilder.start();
// we just wait for the process to finish
Expand All @@ -187,7 +186,7 @@ private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) throw
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
LOGGER.info(line);
log.info(line);
}
}
start.waitFor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -30,10 +29,9 @@
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class RsyncFileTransfer implements FileTransfer {

private static final Logger LOGGER = LoggerFactory.getLogger(RsyncFileTransfer.class);

private static final int SSH_PORT = 22;

private final String host;
Expand Down Expand Up @@ -84,7 +82,7 @@ public void transferAndChown(String sourcePath, String targetPath) {
rsyncCommand.add(sshParameter);
rsyncCommand.add(sourcePath);
rsyncCommand.add(String.format("root@%s:%s", host, targetPath));
LOGGER.info("Generate rsync command: {}", String.join(" ", rsyncCommand));
log.info("Generate rsync command: {}", String.join(" ", rsyncCommand));
ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", rsyncCommand));
Process start = processBuilder.start();
// we just wait for the process to finish
Expand All @@ -93,7 +91,7 @@ public void transferAndChown(String sourcePath, String targetPath) {
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
LOGGER.info(line);
log.info(line);
}
}
start.waitFor();
Expand All @@ -110,7 +108,7 @@ public void transferAndChown(String sourcePath, String targetPath) {
command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
try {
String finalCommand = String.join(" ", command);
LOGGER.info("execute remote command: " + finalCommand);
log.info("execute remote command: " + finalCommand);
clientSession.executeRemoteCommand(finalCommand);
} catch (IOException e) {
// always return error cause xargs return shell command result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.scp.client.ScpClient;
import org.apache.sshd.scp.client.ScpClientCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class ScpFileTransfer implements FileTransfer {

private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class);

private static final int SCP_PORT = 22;

private final String host;
Expand Down Expand Up @@ -90,7 +88,7 @@ public void transferAndChown(String sourcePath, String targetPath) {
command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
try {
String finalCommand = String.join(" ", command);
LOGGER.info("execute remote command: " + finalCommand);
log.info("execute remote command: " + finalCommand);
clientSession.executeRemoteCommand(finalCommand);
} catch (IOException e) {
// always return error cause xargs return shell command result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -44,6 +43,7 @@
/**
* ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
*/
@Slf4j
public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkStateT> {

private final SinkWriter.Context context;
Expand All @@ -52,9 +52,6 @@ public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWri
private final List<String> requestEsList;
private EsRestClient esRestClient;


private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);

public ElasticsearchSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Expand Down Expand Up @@ -115,7 +112,7 @@ public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsLis
if (tryCnt == maxRetry) {
throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
}
LOGGER.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
log.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
}

}
Expand Down
Loading