From 9316e78357eeb2b02b62636a8aa17aaaa0aaac4b Mon Sep 17 00:00:00 2001 From: Chengyu Yan Date: Sat, 2 Dec 2023 12:28:23 +0800 Subject: [PATCH] [Improve][Zeta] Remove assert key words (#5947) --- release-note.md | 1 + .../src/main/java/io/debezium/relational/TableId.java | 4 ++-- .../engine/server/checkpoint/CheckpointCoordinator.java | 8 +++++++- .../engine/server/rest/RestHttpGetCommandProcessor.java | 1 - 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/release-note.md b/release-note.md index 147823065f1b..acfeb8fe8e13 100644 --- a/release-note.md +++ b/release-note.md @@ -150,6 +150,7 @@ - [Zeta] Improve Zeta operation max count and ignore NPE (#4787) - [Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722) - [zeta] Checkpoint exception status messages exclude state data (#5547) +- [Zeta] Remove assert key words (#5947) ## Feature diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java index eea2d050e90e..09a82876849f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java @@ -20,6 +20,7 @@ import io.debezium.annotation.Immutable; import io.debezium.relational.Selectors.TableIdToStringMapper; import io.debezium.schema.DataCollectionId; +import lombok.NonNull; import java.io.Serializable; @@ -97,12 +98,11 @@ protected static TableId parse(String[] parts, int numParts, boolean useCatalogB public TableId( String catalogName, String schemaName, - String tableName, + @NonNull String tableName, TableIdToStringMapper tableIdMapper) { this.catalogName = catalogName; this.schemaName = schemaName; this.tableName = tableName; - assert this.tableName != null; this.id = tableIdMapper == null ? tableId(this.catalogName, this.schemaName, this.tableName) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 09bf416f6a92..656dcf07ebc3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -606,7 +606,13 @@ CompletableFuture triggerPendingCheckpoint( long triggerTimestamp, CompletableFuture idFuture, CheckpointType checkpointType) { - assert Thread.holdsLock(lock); + if (!Thread.holdsLock(lock)) { + throw new RuntimeException( + String.format( + "Unsafe invoke, the current thread[%s] has not acquired the lock[%s].", + Thread.currentThread().getName(), this.lock.toString())); + } + latestTriggerTimestamp.set(triggerTimestamp); return idFuture.thenApplyAsync( checkpointId -> diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 9addb8d8ec8a..484482322c89 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -133,7 +133,6 @@ private void getSystemMonitoringInformation(HttpGetCommand command) { } catch (InterruptedException | ExecutionException e) { logger.severe("get system monitoring information fail", e); } - assert input != null; String[] parts = input.split(", "); JsonObject jobInfo = new JsonObject(); Arrays.stream(parts)