Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj][controller] Emit push job status metrics from controller #1185

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.11.4',
tehuti: 'io.tehuti:tehuti:0.12.2',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -259,35 +260,9 @@ public class VenicePushJob implements AutoCloseable {

private InputStorageQuotaTracker inputStorageQuotaTracker;
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private PushJobHeartbeatSender pushJobHeartbeatSender = null;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;

/**
* Different successful checkpoints and known error scenarios of the VPJ flow.
* 1. The enums are not sequential
* 2. Non-negative enums are successful checkpoints
* 3. Negative enums are error scenarios (Can be user or system errors)
*/
public enum PushJobCheckpoints {
INITIALIZE_PUSH_JOB(0), NEW_VERSION_CREATED(1), START_DATA_WRITER_JOB(2), DATA_WRITER_JOB_COMPLETED(3),
START_JOB_STATUS_POLLING(4), JOB_STATUS_POLLING_COMPLETED(5), START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB(6),
VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7), QUOTA_EXCEEDED(-1), WRITE_ACL_FAILED(-2),
DUP_KEY_WITH_DIFF_VALUE(-3), INPUT_DATA_SCHEMA_VALIDATION_FAILED(-4),
EXTENDED_INPUT_DATA_SCHEMA_VALIDATION_FAILED(-5), RECORD_TOO_LARGE_FAILED(-6), CONCURRENT_BATCH_PUSH(-7),
DATASET_CHANGED(-8), INVALID_INPUT_FILE(-9), ZSTD_DICTIONARY_CREATION_FAILED(-10),
DVC_INGESTION_ERROR_DISK_FULL(-11), DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED(-12),
DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES(-13), DVC_INGESTION_ERROR_OTHER(-14);

private final int value;

PushJobCheckpoints(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

/**
* @param jobId id of the job
* @param vanillaProps Property bag for the job
Expand Down Expand Up @@ -675,7 +650,6 @@ DataWriterComputeJob getDataWriterComputeJob() {
* @throws VeniceException
*/
public void run() {
PushJobHeartbeatSender pushJobHeartbeatSender = null;
try {
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
pushJobSetting.enableSSL,
Expand Down Expand Up @@ -874,8 +848,6 @@ public void run() {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.COMPLETED.getValue()));
}
pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs);
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException(pushJobHeartbeatSender);
sendPushJobDetailsToController();

// only kick off the validation and post-validation flow when everything has to be done in a single VPJ
Expand Down Expand Up @@ -903,8 +875,6 @@ public void run() {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
pushJobDetails.failureDetails = e.toString();
pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs);
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException(pushJobHeartbeatSender);
sendPushJobDetailsToController();
closeVeniceWriter();
} catch (Exception ex) {
Expand All @@ -924,6 +894,7 @@ public void run() {
Utils.closeQuietlyWithErrorLogged(inputDataInfoProvider);
if (pushJobHeartbeatSender != null) {
pushJobHeartbeatSender.stop();
pushJobHeartbeatSender = null;
}
inputDataInfoProvider = null;
if (pushJobSetting.rmdSchemaDir != null) {
Expand Down Expand Up @@ -1018,7 +989,7 @@ private PushJobHeartbeatSender createPushJobHeartbeatSender(final boolean sslEna
}
}

private void updatePushJobDetailsWithLivenessHeartbeatException(PushJobHeartbeatSender pushJobHeartbeatSender) {
private void updatePushJobDetailsWithLivenessHeartbeatException() {
if (pushJobHeartbeatSender == null || this.pushJobDetails == null) {
return;
}
Expand Down Expand Up @@ -1840,6 +1811,12 @@ private void sendPushJobDetailsToController() {
LOGGER.warn("Unable to send push job details for monitoring purpose. The payload was not populated properly");
return;
}

// update push job details with more info if needed
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException();

// send push job details to controller
try {
pushJobDetails.reportTimestamp = System.currentTimeMillis();
int version = pushJobSetting.version <= 0 ? UNCREATED_VERSION_NUMBER : pushJobSetting.version;
Expand Down Expand Up @@ -2779,7 +2756,6 @@ public void cancel() {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.KILLED.getValue()));
}
pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs);
updatePushJobDetailsWithConfigs();
sendPushJobDetailsToController();
}

Expand Down
Loading
Loading