Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] fix get job metrics get NPE #8093

Merged
merged 6 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ seatunnel:
metric:
enabled: false
logs:
scheduled-deletion-enable: true
scheduled-deletion-enable: false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this set to true, it will start the log deletion thread to delete log files, first it will lookup the log folder.

But in CI, log4j is using console print, won't write log to file. https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-e2e-common/src/test/resources/log4j2.properties#L25

So when this config enabled, it will get exception
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem does exist. Is there any better way to avoid this problem

Copy link
Member Author

@liunaijie liunaijie Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we disable it in CI? as we won't write log file during CI and there has no test using this config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for me. The delete expired log file test case config is

.

Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ public PassiveCompletableFuture<Void> submitJob(
() -> {
try {
if (!isStartWithSavePoint
&& getJobHistoryService().getJobMetrics(jobId) != null) {
&& getJobHistoryService().getJobMetrics(jobId)
!= JobMetrics.empty()) {
throw new JobException(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
Expand Down Expand Up @@ -779,7 +780,7 @@ public JobMetrics getJobMetrics(long jobId) {
}
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
return jobMetricsImap != JobMetrics.empty() ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
}

public Map<Long, JobMetrics> getRunningJobMetrics() {
Expand Down Expand Up @@ -831,7 +832,7 @@ public Map<Long, JobMetrics> getRunningJobMetrics() {
longJobMetricsMap.forEach(
(jobId, jobMetrics) -> {
JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
if (jobMetricsImap != null) {
if (jobMetricsImap != JobMetrics.empty()) {
longJobMetricsMap.put(jobId, jobMetricsImap.merge(jobMetrics));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public JobState getJobDetailState(Long jobId) {
}

public JobMetrics getJobMetrics(Long jobId) {
return finishedJobMetricsImap.getOrDefault(jobId, null);
return finishedJobMetricsImap.getOrDefault(jobId, JobMetrics.empty());
}

public JobDAGInfo getJobDAGInfo(Long jobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@

import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.serialization.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@Slf4j
public class CoordinatorServiceTest {
@Test
public void testMasterNodeActive() {
Expand Down Expand Up @@ -96,26 +98,24 @@ public void testClearCoordinatorService() {
CoordinatorService coordinatorService = server1.getCoordinatorService();
Assertions.assertTrue(coordinatorService.isCoordinatorActive());

Long jobId =
coordinatorServiceTest
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
.newId();
Long jobId = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will revert this

log.info("jobId: {}", jobId);
LogicalDag testLogicalDag =
TestUtils.createTestLogicalPlan(
"stream_fake_to_console.conf", "test_clear_coordinator_service", jobId);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobId,
"Test",
"test_clear_coordinator_service",
coordinatorServiceTest.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList(),
Collections.emptyList());

Data data =
coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);

log.info("Start submit job.");
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();
Expand Down