Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj][controller] Emit push job status metrics from controller #1185

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.11.4',
tehuti: 'io.tehuti:tehuti:0.12.2',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.PERMISSION_777;
import static com.linkedin.venice.vpj.VenicePushJobConstants.POLL_JOB_STATUS_INTERVAL_MS;
import static com.linkedin.venice.vpj.VenicePushJobConstants.POLL_STATUS_RETRY_ATTEMPTS;
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_FAILURE_CHECKPOINTS_TO_DEFINE_USER_ERROR;
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE;
import static com.linkedin.venice.vpj.VenicePushJobConstants.REPUSH_TTL_ENABLE;
import static com.linkedin.venice.vpj.VenicePushJobConstants.REPUSH_TTL_SECONDS;
Expand Down Expand Up @@ -87,6 +88,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -261,33 +263,6 @@ public class VenicePushJob implements AutoCloseable {
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;

/**
* Different successful checkpoints and known error scenarios of the VPJ flow.
* 1. The enums are not sequential
* 2. Non-negative enums are successful checkpoints
* 3. Negative enums are error scenarios (Can be user or system errors)
*/
public enum PushJobCheckpoints {
INITIALIZE_PUSH_JOB(0), NEW_VERSION_CREATED(1), START_DATA_WRITER_JOB(2), DATA_WRITER_JOB_COMPLETED(3),
START_JOB_STATUS_POLLING(4), JOB_STATUS_POLLING_COMPLETED(5), START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB(6),
VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7), QUOTA_EXCEEDED(-1), WRITE_ACL_FAILED(-2),
DUP_KEY_WITH_DIFF_VALUE(-3), INPUT_DATA_SCHEMA_VALIDATION_FAILED(-4),
EXTENDED_INPUT_DATA_SCHEMA_VALIDATION_FAILED(-5), RECORD_TOO_LARGE_FAILED(-6), CONCURRENT_BATCH_PUSH(-7),
DATASET_CHANGED(-8), INVALID_INPUT_FILE(-9), ZSTD_DICTIONARY_CREATION_FAILED(-10),
DVC_INGESTION_ERROR_DISK_FULL(-11), DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED(-12),
DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES(-13), DVC_INGESTION_ERROR_OTHER(-14);

private final int value;

PushJobCheckpoints(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

/**
* @param jobId id of the job
* @param vanillaProps Property bag for the job
Expand All @@ -314,6 +289,11 @@ public VenicePushJob(String jobId, Properties vanillaProps) {
LOGGER.info("Push job heartbeat is NOT enabled.");
this.pushJobHeartbeatSenderFactory = new NoOpPushJobHeartbeatSenderFactory();
}
if (props.containsKey(PUSH_JOB_FAILURE_CHECKPOINTS_TO_DEFINE_USER_ERROR)) {
LOGGER.info(
"using user provided push job failure checkpoints to define user error: {}",
props.getString(PUSH_JOB_FAILURE_CHECKPOINTS_TO_DEFINE_USER_ERROR));
}
emptyPushZstdDictionary =
Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData()));
sharedTmpDir = new Path(pushJobSetting.sharedTmpDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,23 @@ private VenicePushJobConstants() {
/** Maximum final dictionary size TODO add more details about the current limits */
public static final String COMPRESSION_DICTIONARY_SIZE_LIMIT = "compression.dictionary.size.limit";

/**
* Override the default checkpoint list PushJobCheckpoints#DEFAULT_USER_ERROR_CHECKPOINTS based on the
* user's requirement. This is useful to emit the push job failure metrics due to user errors or not due to
* user errors based on the user's custom checkpoint list.
*
* List of supported ERROR checkpoints: Config should contain one or more of the following checkpoints separated by comma.
* QUOTA_EXCEEDED, WRITE_ACL_FAILED, DUP_KEY_WITH_DIFF_VALUE, INPUT_DATA_SCHEMA_VALIDATION_FAILED,
* EXTENDED_INPUT_DATA_SCHEMA_VALIDATION_FAILED, RECORD_TOO_LARGE_FAILED, CONCURRENT_BATCH_PUSH,
* DATASET_CHANGED, INVALID_INPUT_FILE, ZSTD_DICTIONARY_CREATION_FAILED, DVC_INGESTION_ERROR_DISK_FULL,
* DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED, DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES,
* DVC_INGESTION_ERROR_OTHER
*
* In case of invalid config, the default list of checkpoints will be used.
*/
public static final String PUSH_JOB_FAILURE_CHECKPOINTS_TO_DEFINE_USER_ERROR =
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
"push.job.failure.checkpoints.to.define.user.error";

// Compute engine abstraction
/**
* Config to set the class for the DataWriter job. When using KIF, we currently will continue to fall back to MR mode.
Expand Down
Loading
Loading