Skip to content

Commit

Permalink
Enable set ServerLoadProtection fot Master/Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jan 8, 2024
1 parent 309c8c9 commit 9f46edb
Show file tree
Hide file tree
Showing 37 changed files with 934 additions and 783 deletions.
682 changes: 344 additions & 338 deletions deploy/kubernetes/dolphinscheduler/README.md

Large diffs are not rendered by default.

27 changes: 18 additions & 9 deletions deploy/kubernetes/dolphinscheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,15 @@ master:
MASTER_TASK_COMMIT_INTERVAL: "1s"
# -- master state wheel interval, the unit is second
MASTER_STATE_WHEEL_INTERVAL: "5s"
# -- Master max cpuload avg, only higher than the system cpu load average, master server can schedule
MASTER_MAX_CPU_LOAD_AVG: "1"
# -- Master reserved memory, only lower than system available memory, master server can schedule, the unit is G
MASTER_RESERVED_MEMORY: "0.3"
MASTER_SERVER_LOAD_PROTECTION_ENABLED: true
# Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Master failover interval, the unit is minute
MASTER_FAILOVER_INTERVAL: "10m"
# -- Master kill application when handle failover
Expand Down Expand Up @@ -621,11 +626,15 @@ worker:
# -- `PersistentVolumeClaim` size
storage: "20Gi"
env:
# -- Worker max cpu load avg, only higher than the system cpu load average, worker server can be dispatched tasks
WORKER_MAX_CPU_LOAD_AVG: "1"
# -- Worker reserved memory, only lower than system available memory, worker server can be dispatched tasks, the unit is G
WORKER_RESERVED_MEMORY: "0.3"
# -- Worker execute thread number to limit task instances
WORKER_SERVER_LOAD_PROTECTION_ENABLED: true
# Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7
WORKER_EXEC_THREADS: "100"
# -- Worker heartbeat interval, the unit is second
WORKER_HEARTBEAT_INTERVAL: "10s"
Expand Down
74 changes: 39 additions & 35 deletions docs/docs/en/architecture/configuration.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

Expand All @@ -37,14 +39,18 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
private final AlertConfig alertConfig;
private final Integer processId;
private final RegistryClient registryClient;

private final MetricsProvider metricsProvider;
private final String heartBeatPath;
private final long startupTime;

public AlertHeartbeatTask(AlertConfig alertConfig,
MetricsProvider metricsProvider,
RegistryClient registryClient) {
super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis());
this.startupTime = System.currentTimeMillis();
this.alertConfig = alertConfig;
this.metricsProvider = metricsProvider;
this.registryClient = registryClient;
this.heartBeatPath =
RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress();
Expand All @@ -53,13 +59,14 @@ public AlertHeartbeatTask(AlertConfig alertConfig,

@Override
public AlertServerHeartBeat getHeartBeat() {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return AlertServerHeartBeat.builder()
.processId(processId)
.startupTime(startupTime)
.reportTime(System.currentTimeMillis())
.cpuUsage(OSUtils.cpuUsagePercentage())
.memoryUsage(OSUtils.memoryUsagePercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
.cpuUsage(systemMetrics.getTotalCpuUsedPercentage())
.memoryUsage(systemMetrics.getSystemMemoryUsedPercentage())
.jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage())
.host(NetUtils.getHost())
.port(alertConfig.getPort())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.alert.registry;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

Expand All @@ -36,12 +37,15 @@ public class AlertRegistryClient implements AutoCloseable {
@Autowired
private AlertConfig alertConfig;

@Autowired
private MetricsProvider metricsProvider;

private AlertHeartbeatTask alertHeartbeatTask;

public void start() {
log.info("AlertRegistryClient starting...");
registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath());
alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, registryClient);
alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, metricsProvider, registryClient);
alertHeartbeatTask.start();
// start heartbeat task
log.info("AlertRegistryClient started...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

public enum ServerStatus {

NORMAL, ABNORMAL, BUSY
NORMAL,
BUSY

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class AlertServerHeartBeat implements HeartBeat {
private long reportTime;
private double cpuUsage;
private double memoryUsage;
private double availablePhysicalMemorySize;
private double jvmMemoryUsage;

private String host;
private int port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ public class MasterHeartBeat implements HeartBeat {
private long startupTime;
private long reportTime;
private double cpuUsage;
private double jvmMemoryUsage;
private double memoryUsage;
private double availablePhysicalMemorySize;
private double reservedMemory;
private double diskAvailable;
private int processId;
private double diskUsage;
private ServerStatus serverStatus;
private int processId;

private String host;
private int port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,16 @@ public class WorkerHeartBeat implements HeartBeat {
private long startupTime;
private long reportTime;
private double cpuUsage;
private double jvmMemoryUsage;
private double memoryUsage;
private double loadAverage;
private double availablePhysicalMemorySize;
private double reservedMemory;
private double diskAvailable;
private double diskUsage;
private ServerStatus serverStatus;
private int processId;

private String host;
private int port;

private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count
private int threadPoolUsage; // worker waiting task count

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,136 +21,49 @@
import org.apache.dolphinscheduler.common.shell.ShellExecutor;

import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
import java.util.regex.Pattern;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

/**
* os utils
*/
// todo: Split to WindowsOSUtils/LinuxOSUtils/MacOSOSUtils/K8sOSUtils...
@Slf4j
@UtilityClass
public class OSUtils {

private static final SystemInfo SI = new SystemInfo();
public static final String TWO_DECIMAL = "0.00";

/**
* return -1 when the function can not get hardware env info
* e.g {@link OSUtils#cpuUsagePercentage()}
*/
public static final double NEGATIVE_ONE = -1;

private static final HardwareAbstractionLayer hal = SI.getHardware();
private static long[] prevTicks = new long[CentralProcessor.TickType.values().length];
private static long prevTickTime = 0L;
private static volatile double cpuUsage = 0.0D;
private static final double TOTAL_MEMORY = hal.getMemory().getTotal() / 1024.0 / 1024 / 1024;

private OSUtils() {
throw new UnsupportedOperationException("Construct OSUtils");
}

/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
*/
private static final Pattern PATTERN = Pattern.compile("\\s+");

/**
* get disk usage
* Keep 2 decimal
*
* @return disk free size, unit: GB
*/
public static double diskAvailable() {
File file = new File(".");
long freeSpace = file.getFreeSpace(); // unallocated / free disk space in bytes.

double diskAvailable = freeSpace / 1024.0 / 1024 / 1024;

DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(diskAvailable));
public static double getTotalSystemMemory() {
return hal.getMemory().getTotal();
}

/**
* get available physical or pod memory size
* <p>
* Keep 2 decimal
*
* @return Available physical or pod memory size, unit: G
*/
public static double availablePhysicalMemorySize() {
double availablePhysicalMemorySize;

if (KubernetesUtils.isKubernetesMode()) {
long freeMemory = Runtime.getRuntime().freeMemory();
availablePhysicalMemorySize = freeMemory / 1024.0 / 1024 / 1024;
} else {
GlobalMemory memory = hal.getMemory();
availablePhysicalMemorySize = memory.getAvailable() / 1024.0 / 1024 / 1024;
}
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(availablePhysicalMemorySize));
}

/**
* get cpu usage
*
* @return cpu usage
*/
public static double cpuUsagePercentage() {
CentralProcessor processor = hal.getProcessor();

// Check if > ~ 0.95 seconds since last tick count.
long now = System.currentTimeMillis();
if (now - prevTickTime > 950) {
// Enough time has elapsed.
if (KubernetesUtils.isKubernetesMode()) {
OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
cpuUsage = operatingSystemMXBean.getSystemLoadAverage();
} else {
cpuUsage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
}

prevTickTime = System.currentTimeMillis();
prevTicks = processor.getSystemCpuLoadTicks();
}

if (Double.isNaN(cpuUsage)) {
return NEGATIVE_ONE;
}

DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(cpuUsage));
}

public static double memoryUsagePercentage() {
return (TOTAL_MEMORY - availablePhysicalMemorySize()) / TOTAL_MEMORY;
public static double getSystemMemoryUsed() {
return hal.getMemory().getTotal() - hal.getMemory().getAvailable();
}

public static List<String> getUserList() {
Expand Down Expand Up @@ -433,43 +346,11 @@ public static String exeShell(String[] command) throws IOException {
return ShellExecutor.execCommand(command);
}

/**
* get process id
*
* @return process id
*/
public static int getProcessID() {
RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);
}

/**
* Check memory and cpu usage is overload the given thredshod.
*
* @param maxCpuLoadAvgThreshold maxCpuLoadAvg
* @param reservedMemoryThreshold reservedMemory
* @return True, if the cpu or memory exceed the given thredshod.
*/
public static Boolean isOverload(double maxCpuLoadAvgThreshold, double reservedMemoryThreshold) {
// system load average
double freeCPUPercentage = 1 - cpuUsagePercentage();
// system available physical memory
double freeMemoryPercentage = 1 - memoryUsagePercentage();
if (freeCPUPercentage > maxCpuLoadAvgThreshold) {
log.warn("Current cpu load average {} is too high, max.cpuLoad.avg={}", freeCPUPercentage,
maxCpuLoadAvgThreshold);
return true;
}

if (freeMemoryPercentage < reservedMemoryThreshold) {
log.warn(
"Current available memory percentage{} is too low, reserved.memory={}", freeMemoryPercentage,
reservedMemoryThreshold);
return true;
}
return false;
}

public static Boolean isWindows() {
return System.getProperty("os.name").startsWith("Windows");
}
Expand Down
Loading

0 comments on commit 9f46edb

Please sign in to comment.