Skip to content

Commit

Permalink
[FLINK-36039][autoscaler] Support clean historical event handler reco…
Browse files Browse the repository at this point in the history
…rds in JDBC event handler (#865)

---------

Co-authored-by: Rui Fan <fanrui@apache.org>
  • Loading branch information
RocMarshal and 1996fanrui committed Aug 26, 2024
1 parent 0ef627e commit 6a426b2
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>Integer</td>
<td>The port of flink cluster when the flink-cluster fetcher is used.</td>
</tr>
<tr>
<td><h5>autoscaler.standalone.jdbc.event-handler.ttl</h5></td>
<td style="word-wrap: break-word;">90 d</td>
<td>Duration</td>
<td>The time to live based on create time for the JDBC event handler records. When the config is set as '0', the ttl strategy for the records would be disabled.</td>
</tr>
<tr>
<td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td>
<td style="word-wrap: break-word;">"JDBC_PWD"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@
package org.apache.flink.autoscaler.jdbc.event;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* The event handler which persists its event in JDBC related database.
Expand All @@ -38,13 +47,34 @@
* @param <Context> The job autoscaler context.
*/
@Experimental
@Slf4j
public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
implements AutoScalerEventHandler<KEY, Context> {

private final JdbcEventInteractor jdbcEventInteractor;
private final Duration eventHandlerTtl;
@Nullable private final ScheduledExecutorService scheduledEventHandlerCleaner;

public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) {
public JdbcAutoScalerEventHandler(
JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) {
this.jdbcEventInteractor = jdbcEventInteractor;
this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl);

if (eventHandlerTtl.toMillis() <= 0) {
this.scheduledEventHandlerCleaner = null;
return;
}
this.scheduledEventHandlerCleaner =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("jdbc-autoscaler-events-cleaner")
.setDaemon(true)
.build());
this.scheduledEventHandlerCleaner.scheduleAtFixedRate(
this::cleanExpiredEvents,
Duration.ofDays(1).toMillis(),
Duration.ofDays(1).toMillis(),
TimeUnit.MILLISECONDS);
}

@SneakyThrows
Expand Down Expand Up @@ -104,6 +134,48 @@ public void handleScalingEvent(
}
}

@Override
public void close() {
if (Objects.nonNull(scheduledEventHandlerCleaner)
&& !scheduledEventHandlerCleaner.isShutdown()) {
scheduledEventHandlerCleaner.shutdownNow();
}
}

@VisibleForTesting
void cleanExpiredEvents() {
final var batchSize = 2000;
final var sleepMs = 1000;

var date =
Timestamp.from(
jdbcEventInteractor
.getCurrentInstant()
.minusMillis(eventHandlerTtl.toMillis()));
try {
var deletedTotalCount = 0L;
while (true) {
Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
if (Objects.isNull(minId)) {
log.info(
"Deleted expired {} event handler records successfully",
deletedTotalCount);
break;
}

for (long startId = minId;
jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate(
startId, startId + batchSize, date)
== batchSize;
startId += batchSize) {
Thread.sleep(sleepMs);
}
}
} catch (Exception e) {
log.error("Error in cleaning expired event handler records.", e);
}
}

/**
* @return True means the existing event is still in the interval duration we can update it.
* Otherwise, it's too early, we should create a new one instead of updating it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.sql.Connection;
import java.sql.ResultSet;
Expand Down Expand Up @@ -152,4 +153,29 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws
void setClock(@Nonnull Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}

@Nullable
Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
var sql =
"SELECT id from t_flink_autoscaler_event_handler "
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
+ " and create_time < ?";
try (var pstmt = conn.prepareStatement(sql)) {
pstmt.setObject(1, timestamp);
ResultSet resultSet = pstmt.executeQuery();
return resultSet.next() ? resultSet.getLong(1) : null;
}
}

int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp)
throws Exception {
var query =
"delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
try (var pstmt = conn.prepareStatement(query)) {
pstmt.setObject(1, startId);
pstmt.setObject(2, endId);
pstmt.setObject(3, timestamp);
return pstmt.executeUpdate();
}
}
}
Loading

0 comments on commit 6a426b2

Please sign in to comment.