Skip to content

Commit

Permalink
[scheduler] Admission rotation window + improve hash distribution (#865)
Browse files Browse the repository at this point in the history
The current mechanism will always penalize the same ids. This PR
introduces a time-based window mechanism to rotate which IDs are
rejected over time. I've also added a multiplication with a large prime
to improve the hash distribution in case the incoming IDs aren't well
distributed.
  • Loading branch information
fwbrasil authored Nov 26, 2024
1 parent c7a6e46 commit e996c17
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,28 @@ import scala.util.hashing.MurmurHash3
*
* ==Rejection Mechanism==
*
* Tasks are rejected using a deterministic hashing mechanism that provides stable and consistent admission decisions. Each task key
* (string or integer) is hashed to a value between 0-99, and tasks with hash values above the current admission percentage are rejected.
* This approach ensures the same keys will be consistently rejected until the admission percentage changes.
* Tasks are rejected using a deterministic hashing mechanism that provides stable and consistent admission decisions within time windows.
* Each task key (string or integer) is hashed using a large prime number multiplication and the current time window to generate a value
* between 0-99. Tasks with hash values above the current admission percentage are rejected.
*
* This deterministic approach provides significant benefits:
* - Individual users/tasks receive consistent service or rejection
* - Related requests from the same user/session get uniform treatment
* - Retries from rejected tasks won't add load since they'll stay rejected
* - Accepted tasks can proceed without interruption
* The rotation mechanism is particularly important for fairness when task keys represent user identifiers:
* - Without rotation, users whose IDs hash to high values would face persistent rejection during system pressure
* - This could lead to poor user experience where some users are consistently locked out while others maintain access
* - Rotation ensures rejection patterns shift periodically, giving previously rejected users opportunities for admission
* - The rotation window duration can be tuned to balance stability and fairness
*
* For example, with a 60-minute rotation window:
* - User A might be rejected in the first window due to their ID's hash value
* - In the next window, the time-based component changes the hash calculation
* - User A now has a different effective priority and may be admitted while others are rejected
* - This prevents any user from experiencing extended denial of service
*
* This approach ensures:
* - Consistent decisions within each rotation window
* - Even distribution of rejections across the key space
* - Periodic rotation to prevent unfair persistent rejection
* - Load balancing through prime number distribution
* - Fair access patterns over time for all users
*
* ==Load Shedding Pattern==
*
Expand All @@ -43,13 +56,13 @@ import scala.util.hashing.MurmurHash3
* - Additional tasks are rejected if pressure continues building
* - System stabilizes at lower load with a stable subset of traffic
* - During recovery, admission percentage gradually increases
* - Previously rejected tasks are readmitted in the same order they were rejected
* - Previously rejected tasks may be admitted in new time windows
*
* ==Backpressure Characteristics==
*
* This design creates an effective backpressure mechanism with several key characteristics. Load reduces predictably as the admission
* percentage drops, avoiding the oscillation patterns common with random rejection strategies. The system maintains a stable subset of
* flowing traffic, while providing natural queue-like behavior for rejected requests.
* flowing traffic within each time window, while providing natural queue-like behavior for rejected requests.
*
* ==Distributed Systems Context==
*
Expand All @@ -67,6 +80,8 @@ import scala.util.hashing.MurmurHash3
* Timer for scheduling periodic regulation
* @param config
* Configuration parameters controlling admission behavior
* @param rotationWindow
* Duration after which the rejection pattern rotates to allow previously rejected keys another chance
*
* @see
* Regulator for details on the underlying regulation mechanism
Expand All @@ -76,9 +91,12 @@ final class Admission(
schedule: Task => Unit,
nowMillis: LongSupplier,
timer: InternalTimer,
config: Config = Admission.defaultConfig
config: Config = Admission.defaultConfig,
rotationWindow: Duration = Flag("admission.rotationWindowMinutes", 60).minutes
) extends Regulator(loadAvg, timer, config) {

private val largePrime = (Math.pow(2, 31) - 1).toInt

@volatile private var admissionPercent = 100

private val rejected = new LongAdder
Expand Down Expand Up @@ -155,7 +173,8 @@ final class Admission(
* true if the task should be rejected, false if it should be admitted
*/
def reject(key: Int): Boolean = {
val r = (key.abs % 100) > admissionPercent
val windowId = (nowMillis.getAsLong() / rotationWindow.toMillis).toInt + 1
val r = (key * largePrime * windowId).abs % 100 > admissionPercent
if (r) rejected.increment()
else allowed.increment()
r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,58 @@ class AdmissionTest extends AnyFreeSpec with NonImplicitAssertions {
}
}

"rotation window" - {
"rejection set varies over time with fixed load" in new Context {
loadAvg = 0.9
jitter = jitterUpperThreshold * 10

timer.advanceAndRun(regulateInterval * 2)
assert(admission.percent() == 97)

val keys = (1 to 1000).toSet
val window1Rejects = keys.filter(admission.reject)
assert(window1Rejects.size > 0)

timer.advance(1.hour)
assert(admission.percent() == 97)
val window2Rejects = keys.filter(admission.reject)
assert(window2Rejects.size > 0)

timer.advance(1.hour)
assert(admission.percent() == 97)
val window3Rejects = keys.filter(admission.reject)
assert(window3Rejects.size > 0)

assert(window1Rejects != window2Rejects)
assert(window2Rejects != window3Rejects)
assert(window1Rejects != window3Rejects)

assert((window1Rejects.size - 30).abs <= 10)
assert((window2Rejects.size - 30).abs <= 10)
assert((window3Rejects.size - 30).abs <= 10)
}

"different loads give proportionally different rejection sets" in new Context {
jitter = jitterUpperThreshold * 10

loadAvg = 0.9
timer.advanceAndRun(regulateInterval * 2)
assert(admission.percent() == 97)

val keys = (1 to 1000).toSet
val highAcceptRejects = keys.filter(admission.reject)

loadAvg = 0.95
timer.advanceAndRun(regulateInterval * 4)
assert(admission.percent() == 59)

timer.advance(1.hour)
val lowAcceptRejects = keys.filter(admission.reject)

assert(lowAcceptRejects.size > highAcceptRejects.size)
}
}

trait Context {
val timer = TestTimer()
var loadAvg: Double = 0.8
Expand Down

0 comments on commit e996c17

Please sign in to comment.