Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj][controller] Emit push job status metrics from controller #1185

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.11.4',
tehuti: 'io.tehuti:tehuti:0.12.2',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static com.linkedin.venice.utils.ByteUtils.generateHumanReadableByteCountString;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -261,33 +262,6 @@ public class VenicePushJob implements AutoCloseable {
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;

/**
* Different successful checkpoints and known error scenarios of the VPJ flow.
* 1. The enums are not sequential
* 2. Non-negative enums are successful checkpoints
* 3. Negative enums are error scenarios (Can be user or system errors)
*/
public enum PushJobCheckpoints {
INITIALIZE_PUSH_JOB(0), NEW_VERSION_CREATED(1), START_DATA_WRITER_JOB(2), DATA_WRITER_JOB_COMPLETED(3),
START_JOB_STATUS_POLLING(4), JOB_STATUS_POLLING_COMPLETED(5), START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB(6),
VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7), QUOTA_EXCEEDED(-1), WRITE_ACL_FAILED(-2),
DUP_KEY_WITH_DIFF_VALUE(-3), INPUT_DATA_SCHEMA_VALIDATION_FAILED(-4),
EXTENDED_INPUT_DATA_SCHEMA_VALIDATION_FAILED(-5), RECORD_TOO_LARGE_FAILED(-6), CONCURRENT_BATCH_PUSH(-7),
DATASET_CHANGED(-8), INVALID_INPUT_FILE(-9), ZSTD_DICTIONARY_CREATION_FAILED(-10),
DVC_INGESTION_ERROR_DISK_FULL(-11), DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED(-12),
DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES(-13), DVC_INGESTION_ERROR_OTHER(-14);

private final int value;

PushJobCheckpoints(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

/**
* @param jobId id of the job
* @param vanillaProps Property bag for the job
Expand Down
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
Expand Down Expand Up @@ -883,7 +884,7 @@ public void testGetPerColoPushJobDetailsStatusFromExecutionStatus() {
}

/**
* Tests that the error message for the {@link VenicePushJob.PushJobCheckpoints#RECORD_TOO_LARGE_FAILED} code path of
* Tests that the error message for the {@link com.linkedin.venice.PushJobCheckpoints#RECORD_TOO_LARGE_FAILED} code path of
* {@link VenicePushJob#updatePushJobDetailsWithJobDetails(DataWriterTaskTracker)} uses maxRecordSizeBytes.
*/
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
Expand All @@ -901,7 +902,7 @@ public void testUpdatePushJobDetailsWithJobDetailsRecordTooLarge(boolean chunkin
final String errorMessage = vpj.updatePushJobDetailsWithJobDetails(dataWriterTaskTracker);
final int latestCheckpoint = pushJobDetails.pushJobLatestCheckpoint;
Assert.assertTrue(errorMessage.contains((chunkingEnabled) ? "100.0 MiB" : "950.0 KiB"), errorMessage);
Assert.assertEquals(latestCheckpoint, VenicePushJob.PushJobCheckpoints.RECORD_TOO_LARGE_FAILED.getValue());
Assert.assertEquals(latestCheckpoint, PushJobCheckpoints.RECORD_TOO_LARGE_FAILED.getValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.venice.pushmonitor;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import com.linkedin.venice.utils.EnumUtils;
import com.linkedin.venice.utils.VeniceEnumValue;
import java.util.List;


/**
Expand All @@ -21,7 +21,7 @@
*
* TODO: Break this up in JobExecutionStatus and TaskExecutionStatus. It's pretty confusing to mix them ): ...
*/
public enum ExecutionStatus {
public enum ExecutionStatus implements VeniceEnumValue {
/** Job doesn't yet exist */
NOT_CREATED(true, false, false, false, 0),

Expand Down Expand Up @@ -139,6 +139,8 @@ public enum ExecutionStatus {
this.value = value;
}

private static final List<ExecutionStatus> TYPES = EnumUtils.getEnumValuesList(ExecutionStatus.class);

/**
* Some of the statuses are like watermark. These statuses are used in {@link PushMonitor} and
* {@link com.linkedin.venice.router.api.VeniceVersionFinder} to determine whether a job is finished
Expand Down Expand Up @@ -178,24 +180,13 @@ public static boolean isIncrementalPushStatus(int statusVal) {
|| statusVal == END_OF_INCREMENTAL_PUSH_RECEIVED.getValue();
}

@Override
public int getValue() {
return value;
}

/**
* Get ExecutionStatus from integer ordinal value in avro.
*/
private static final Map<Integer, ExecutionStatus> idMapping = new HashMap<>();
static {
Arrays.stream(values()).forEach(s -> idMapping.put(s.value, s));
}

public static ExecutionStatus fromInt(int v) {
ExecutionStatus status = idMapping.get(v);
if (status == null) {
return ExecutionStatus.UNKNOWN;
}
return status;
public static ExecutionStatus valueOf(int value) {
return EnumUtils.valueOf(TYPES, value, ExecutionStatus.class);
}

public ExecutionStatus getRootStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;


Expand Down Expand Up @@ -56,19 +58,47 @@ public static <V extends VeniceEnumValue> List<V> getEnumValuesList(Class<V> enu
return Collections.unmodifiableList(Arrays.asList(array));
}

/**
* This is a relaxed version of {@link #getEnumValuesList(Class)} which returns a map instead of a list.
* This is useful when the values are not contiguous, or when the values are not starting from 0.
*/
public static <V extends VeniceEnumValue> Map<Integer, V> getEnumValuesListRelaxed(Class<V> enumToProvideArrayOf) {
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
String name = enumToProvideArrayOf.getSimpleName();
Map<Integer, V> map = new HashMap<>();
for (V type: enumToProvideArrayOf.getEnumConstants()) {
if (map.put(type.getValue(), type) != null) {
throw new IllegalStateException(name + " values must be unique!");
}
}
return Collections.unmodifiableMap(map);
}

public static <V extends VeniceEnumValue> V valueOf(List<V> valuesList, int value, Class<V> enumClass) {
return valueOf(valuesList, value, enumClass, VeniceException::new);
}

public static <V extends VeniceEnumValue> V valueOf(Map<Integer, V> valuesMap, int value, Class<V> enumClass) {
return valueOf(valuesMap, value, enumClass, VeniceException::new);
}

public static <V extends VeniceEnumValue> V valueOf(
List<V> valuesList,
Object values,
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
int value,
Class<V> enumClass,
Function<String, VeniceException> exceptionConstructor) {
try {
return valuesList.get(value);
} catch (IndexOutOfBoundsException e) {
throw exceptionConstructor.apply("Invalid enum value for " + enumClass.getSimpleName() + ": " + value);
if (values instanceof List) {
try {
return ((List<V>) values).get(value);
} catch (IndexOutOfBoundsException e) {
throw exceptionConstructor.apply("Invalid enum value for " + enumClass.getSimpleName() + ": " + value);
}
} else if (values instanceof Map) {
if (!((Map<Integer, V>) values).containsKey(value)) {
throw exceptionConstructor.apply("Invalid enum value for " + enumClass.getSimpleName() + ": " + value);
}
return ((Map<Integer, V>) values).get(value);
} else {
throw new IllegalArgumentException("Invalid values type: " + values.getClass().getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.linkedin.venice.pushmonitor;

import static com.linkedin.venice.pushmonitor.ExecutionStatus.ARCHIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.CATCH_UP_BASE_TOPIC_OFFSET_LAG;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.COMPLETED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DATA_RECOVERY_COMPLETED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DROPPED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED;
Expand All @@ -12,21 +10,15 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.END_OF_PUSH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.NEW;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.NOT_CREATED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.NOT_STARTED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.PROGRESS;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.STARTED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.START_OF_BUFFER_REPLAY_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.TOPIC_SWITCH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.UNKNOWN;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.WARNING;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.isDeterminedStatus;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.alpini.base.misc.CollectionUtil;
import com.linkedin.venice.utils.VeniceEnumValueTest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -35,7 +27,39 @@
import org.testng.annotations.Test;


public class ExecutionStatusTest {
public class ExecutionStatusTest extends VeniceEnumValueTest<ExecutionStatus> {
public ExecutionStatusTest() {
super(ExecutionStatus.class);
}

@Override
protected Map<Integer, ExecutionStatus> expectedMapping() {
return CollectionUtil.<Integer, ExecutionStatus>mapBuilder()
.put(0, ExecutionStatus.NOT_CREATED)
.put(1, ExecutionStatus.NEW)
.put(2, ExecutionStatus.STARTED)
.put(3, ExecutionStatus.PROGRESS)
.put(4, ExecutionStatus.END_OF_PUSH_RECEIVED)
.put(5, ExecutionStatus.START_OF_BUFFER_REPLAY_RECEIVED)
.put(6, ExecutionStatus.TOPIC_SWITCH_RECEIVED)
.put(7, ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED)
.put(8, ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED)
.put(9, ExecutionStatus.DROPPED)
.put(10, ExecutionStatus.COMPLETED)
.put(11, ExecutionStatus.WARNING)
.put(12, ExecutionStatus.ERROR)
.put(13, ExecutionStatus.CATCH_UP_BASE_TOPIC_OFFSET_LAG)
.put(14, ExecutionStatus.ARCHIVED)
.put(15, ExecutionStatus.UNKNOWN)
.put(16, ExecutionStatus.NOT_STARTED)
.put(17, ExecutionStatus.DATA_RECOVERY_COMPLETED)
.put(18, ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL)
.put(19, ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED)
.put(20, ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
.put(21, ExecutionStatus.DVC_INGESTION_ERROR_OTHER)
.build();
}

@Test
public void testisDVCIngestionError() {
for (ExecutionStatus status: ExecutionStatus.values()) {
Expand Down Expand Up @@ -73,43 +97,6 @@ public void testIsErrorWithInputString() {
assertFalse(ExecutionStatus.isError("123"));
}

/**
* Test to prevent unintentional changing of the values of ExecutionStatus
* as values are persisted and used across services
*/
@Test
public void testExecutionStatusValue() {
assertEquals(ExecutionStatus.values().length, 22);

for (ExecutionStatus status: ExecutionStatus.values()) {
assertEquals(status.getValue(), status.ordinal());
}

// check all the values in the enum one by one to make sure it's not changed
assertEquals(NOT_CREATED.getValue(), 0);
assertEquals(NEW.getValue(), 1);
assertEquals(STARTED.getValue(), 2);
assertEquals(PROGRESS.getValue(), 3);
assertEquals(END_OF_PUSH_RECEIVED.getValue(), 4);
assertEquals(START_OF_BUFFER_REPLAY_RECEIVED.getValue(), 5);
assertEquals(TOPIC_SWITCH_RECEIVED.getValue(), 6);
assertEquals(START_OF_INCREMENTAL_PUSH_RECEIVED.getValue(), 7);
assertEquals(END_OF_INCREMENTAL_PUSH_RECEIVED.getValue(), 8);
assertEquals(DROPPED.getValue(), 9);
assertEquals(COMPLETED.getValue(), 10);
assertEquals(WARNING.getValue(), 11);
assertEquals(ERROR.getValue(), 12);
assertEquals(CATCH_UP_BASE_TOPIC_OFFSET_LAG.getValue(), 13);
assertEquals(ARCHIVED.getValue(), 14);
assertEquals(UNKNOWN.getValue(), 15);
assertEquals(NOT_STARTED.getValue(), 16);
assertEquals(DATA_RECOVERY_COMPLETED.getValue(), 17);
assertEquals(DVC_INGESTION_ERROR_DISK_FULL.getValue(), 18);
assertEquals(DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED.getValue(), 19);
assertEquals(DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES.getValue(), 20);
assertEquals(DVC_INGESTION_ERROR_OTHER.getValue(), 21);
}

@Test
public void testIsUsedByDaVinciClientOnly() {
Set<ExecutionStatus> dvcOnly = new HashSet<>();
Expand Down
Loading
Loading