diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala index 3c7c65dcbe..8676256956 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala @@ -22,9 +22,10 @@ import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.proxy.FlinkShimsProxy +import java.security.Permission + import scala.language.{implicitConversions, reflectiveCalls} import scala.reflect.ClassTag - object FlinkClient extends Logger { private[this] val FLINK_CLIENT_HANDLER_CLASS_NAME = @@ -46,7 +47,13 @@ object FlinkClient extends Logger { "org.apache.streampark.flink.client.bean.TriggerSavepointRequest" -> "triggerSavepoint" def submit(submitRequest: SubmitRequest): SubmitResponse = { - proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, SUBMIT_REQUEST) + val securityManager = System.getSecurityManager + try { + System.setSecurityManager(new ExitSecurityManager()) + proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, SUBMIT_REQUEST) + } finally { + System.setSecurityManager(securityManager) + } } def cancel(stopRequest: CancelRequest): CancelResponse = { @@ -87,3 +94,13 @@ object FlinkClient extends Logger { } } + +/** Used to mask JVM requests for external operations */ +class ExitSecurityManager extends SecurityManager { + override def checkExit(status: Int): Unit = { + throw new SecurityException( + s"System.exit($status) was called in your flink job, The job has been stopped, please check your program...") + } + + override def checkPermission(perm: Permission): Unit = {} +}