Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1923] Add retention for lease arbiter table #3792

Merged
merged 6 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
public static final int DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 300000000;
umustafi marked this conversation as resolved.
Show resolved Hide resolved
// Refers to the event we originally tried to acquire a lease which achieved `consensus` among participants through
// the database
public static final String SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY = "preservedConsensusEventTimeMillis";
Expand All @@ -116,7 +118,7 @@ public class ConfigurationKeys {
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;

// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -87,13 +89,14 @@ protected interface CheckedFunction<T, R> {
private final String constantsTableName;
private final int epsilon;
private final int linger;
private final int retention;
umustafi marked this conversation as resolved.
Show resolved Hide resolved
private String thisTableRetentionStatement;
private String thisTableGetInfoStatement;
private String thisTableGetInfoStatementForReminder;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
/*
Notes:
- Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col
Expand All @@ -110,9 +113,13 @@ protected interface CheckedFunction<T, R> {
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
+ "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, "
+ "event_timestamp TIMESTAMP NOT NULL, "
+ "lease_acquisition_timestamp TIMESTAMP NULL, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as migrating this schema... will it require manual intervention to either drop or alter table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I believe we'll have to drop that table. I will do that before deploying.

+ "PRIMARY KEY (flow_group,flow_name,flow_action))";
// Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed
// since retention >> linger
private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < "
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
Expand Down Expand Up @@ -163,7 +170,8 @@ protected interface CheckedFunction<T, R> {
// Complete lease acquisition if values have not changed since lease was acquired
protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = "UPDATE %s SET "
+ "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
protected static final Calendar UTC_CAL = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
private static final ThreadLocal<Calendar> UTC_CAL =
ThreadLocal.withInitial(() -> Calendar.getInstance(TimeZone.getTimeZone("UTC")));

@Inject
public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
Expand All @@ -182,6 +190,9 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
this.linger = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.retention = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
Expand All @@ -203,7 +214,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
throw new IOException("Table creation failure for " + leaseArbiterTableName, e);
}
initializeConstantsTable();

runRetentionOnArbitrationTable();
log.info("MysqlMultiActiveLeaseArbiter initialized");
}

Expand All @@ -221,6 +232,35 @@ private void initializeConstantsTable() throws IOException {
}, true);
}

/**
* Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config.
*/
private void runRetentionOnArbitrationTable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit but can we add the code for starting a thread within this method? Due to how it's implemented as an infinite loop we would generally always want this function to be run asynchronously

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems worth abstracting into a utility along the lines of "run this arbitrary SQL in a STPE using interval T"

you decide right now whether to do or merely "TODO" ;)

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
Thread.sleep(10000);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
int i = 0;
retentionStatement.setInt(++i, retention);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
int numRowsDeleted = retentionStatement.executeUpdate();
if (numRowsDeleted != 0) {
log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table",
numRowsDeleted);
}
return numRowsDeleted;
}, true);
} catch (InterruptedException | IOException e) {
log.warn("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
umustafi marked this conversation as resolved.
Show resolved Hide resolved
+ "affect our system performance. Examine exception: ", e);
}
};

// Run retention thread every 4 hours (6 times a day)
executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
}

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
Expand Down Expand Up @@ -340,12 +380,12 @@ protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAc
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException {
try {
// Extract values from result set
Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL);
Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL);
Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get());
Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get());
boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
int leaseValidityStatus = resultSet.getInt("lease_validity_status");
int dbLinger = resultSet.getInt("linger");
Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL);
Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL.get());
return new GetEventInfoResult(dbEventTimestamp, dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
dbLinger, dbCurrentTimestamp);
} catch (SQLException e) {
Expand Down Expand Up @@ -423,14 +463,14 @@ protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) th
throw new IOException("Expected resultSet containing row information for the lease that was attempted but "
+ "received nothing.");
}
if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) {
if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()) == null) {
throw new IOException("event_timestamp should never be null (it is always set to current timestamp)");
}
long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL).getTime();
long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()).getTime();
// Lease acquisition timestamp is null if another participant has completed the lease
Optional<Long> leaseAcquisitionTimeMillis =
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL) == null ? Optional.empty() :
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL).getTime());
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()) == null ? Optional.empty() :
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()).getTime());
int dbLinger = resultSet.getInt("linger");
return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
Expand Down Expand Up @@ -526,10 +566,10 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen
statement.setString(++i, flowAction.getFlowActionType().toString());
// Values that may be needed depending on the insert statement
if (needEventTimeCheck) {
statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL);
statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL.get());
}
if (needLeaseAcquisitionTimeCheck) {
statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL);
statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL.get());
}
}

Expand All @@ -546,8 +586,8 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL);
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL);
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
Expand Down
Loading