Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamped failure models #228

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opendc-compute/opendc-compute-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ description = "API interface for the OpenDC Compute service"
plugins {
`kotlin-library-conventions`
}
dependencies {
implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package org.opendc.compute.api

import org.opendc.simulator.compute.workload.SimWorkload
import java.util.UUID

/**
Expand Down Expand Up @@ -113,6 +114,11 @@ public interface ComputeClient : AutoCloseable {
start: Boolean = true,
): Server

public fun rescheduleServer(
server: Server,
workload: SimWorkload,
)

/**
* Release the resources associated with this client, preventing any further API calls.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@

package org.opendc.compute.carbon

import mu.KotlinLogging
import org.opendc.trace.Trace
import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
import org.opendc.trace.conv.TABLE_CARBON_INTENSITY
import org.opendc.trace.conv.TABLE_CARBON_INTENSITIES
import java.io.File
import java.lang.ref.SoftReference
import java.time.Instant
Expand All @@ -38,11 +37,6 @@ import java.util.concurrent.ConcurrentHashMap
* @param baseDir The directory containing the traces.
*/
public class CarbonTraceLoader {
/**
* The logger for this instance.
*/
private val logger = KotlinLogging.logger {}

/**
* The cache of workloads.
*/
Expand All @@ -54,13 +48,11 @@ public class CarbonTraceLoader {
* Read the metadata into a workload.
*/
private fun parseCarbon(trace: Trace): List<CarbonFragment> {
val reader = checkNotNull(trace.getTable(TABLE_CARBON_INTENSITY)).newReader()
val reader = checkNotNull(trace.getTable(TABLE_CARBON_INTENSITIES)).newReader()

val startTimeCol = reader.resolve(CARBON_INTENSITY_TIMESTAMP)
val carbonIntensityCol = reader.resolve(CARBON_INTENSITY_VALUE)

val entries = mutableListOf<CarbonFragment>()

try {
while (reader.nextRow()) {
val startTime = reader.getInstant(startTimeCol)!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,21 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.failure
description = "OpenDC Failure Service implementation"

import org.opendc.compute.service.ComputeService
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
// Build configuration
plugins {
`kotlin-library-conventions`
}

/**
* Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
*/
public interface FailureModel {
/**
* Construct a [HostFaultInjector] for the specified [service].
*/
public fun createInjector(
context: CoroutineContext,
clock: InstantSource,
service: ComputeService,
random: RandomGenerator,
): HostFaultInjector
dependencies {
api(projects.opendcCompute.opendcComputeApi)
implementation(projects.opendcCommon)
implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-api")))
implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-simulator")))

api(libs.commons.math3)
implementation(libs.kotlin.logging)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.failure
package org.opendc.compute.failure.hostfault

import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
import java.time.InstantSource

/**
* Interface responsible for applying the fault to a host.
*/
public interface HostFault {
public abstract class HostFault(
private val service: ComputeService,
) {
/**
* Apply the fault to the specified [victims].
*/
public suspend fun apply(
clock: InstantSource,
public abstract suspend fun apply(
victims: List<SimHost>,
faultDuration: Long,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,43 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.failure
package org.opendc.compute.failure.hostfault

import kotlinx.coroutines.delay
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
import java.time.InstantSource
import kotlin.math.roundToLong
import org.opendc.simulator.compute.workload.SimWorkload

/**
* A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
* A type of [HostFault] where the hosts are stopped and recover after a given amount of time.
*/
public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
public class StartStopHostFault(
private val service: ComputeService,
) : HostFault(service) {
override suspend fun apply(
clock: InstantSource,
victims: List<SimHost>,
faultDuration: Long,
) {
val client: ComputeClient = service.newClient()

for (host in victims) {
host.fail()
}
val servers = host.instances

val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds
val snapshots = servers.map { (it.meta["workload"] as SimWorkload).snapshot() }
host.fail()

// Handle long overflow
if (clock.millis() + df <= 0) {
return
for ((server, snapshot) in servers.zip(snapshots)) {
client.rescheduleServer(server, snapshot)
}
}

delay(df)
delay(faultDuration)

for (host in victims) {
host.recover()
}
}

override fun toString(): String = "StartStopHostFault[$duration]"
override fun toString(): String = "StartStopHostFault"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,39 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.internal
package org.opendc.compute.failure.models

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.failure.hostfault.HostFault
import org.opendc.compute.failure.hostfault.StartStopHostFault
import org.opendc.compute.failure.victimselector.StochasticVictimSelector
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFault
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.VictimSelector
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong

/**
* Internal implementation of the [HostFaultInjector] interface.
*
* @param context The scope to run the fault injector in.
* @param clock The [InstantSource] to keep track of simulation time.
* @param hosts The set of hosts to inject faults into.
* @param iat The inter-arrival time distribution of the failures (in hours).
* @param selector The [VictimSelector] to select the host victims.
* @param fault The type of [HostFault] to inject.
* Factory interface for constructing [FailureModel] for modeling failures of compute service hosts.
*/
internal class HostFaultInjectorImpl(
private val context: CoroutineContext,
private val clock: InstantSource,
private val hosts: Set<SimHost>,
private val iat: RealDistribution,
private val selector: VictimSelector,
private val fault: HostFault,
) : HostFaultInjector {
/**
* The scope in which the injector runs.
*/
private val scope = CoroutineScope(context + Job())
public abstract class FailureModel(
context: CoroutineContext,
protected val clock: InstantSource,
protected val service: ComputeService,
protected val random: RandomGenerator,
) : AutoCloseable {
protected val scope: CoroutineScope = CoroutineScope(context + Job())

// TODO: could at some point be extended to different types of faults
protected val fault: HostFault = StartStopHostFault(service)

// TODO: could at some point be extended to different types of victim selectors
protected val victimSelector: StochasticVictimSelector = StochasticVictimSelector(random)

protected val hosts: Set<SimHost> = service.hosts.map { it as SimHost }.toSet()

/**
* The [Job] that awaits the nearest fault in the system.
Expand All @@ -67,7 +62,7 @@ internal class HostFaultInjectorImpl(
/**
* Start the fault injection into the system.
*/
override fun start() {
public fun start() {
if (job != null) {
return
}
Expand All @@ -79,25 +74,7 @@ internal class HostFaultInjectorImpl(
}
}

/**
* Converge the injection process.
*/
private suspend fun runInjector() {
while (true) {
// Make sure to convert delay from hours to milliseconds
val d = (iat.sample() * 3.6e6).roundToLong()

// Handle long overflow
if (clock.millis() + d <= 0) {
return
}

delay(d)

val victims = selector.select(hosts)
fault.apply(clock, victims)
}
}
public abstract suspend fun runInjector()

/**
* Stop the fault injector.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.compute.failure.models

import kotlinx.coroutines.delay
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.service.ComputeService
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToLong

/**
* Sample based failure model
*
* @property context
* @property clock
* @property service
* @property random
* @property iatSampler A distribution from which the time until the next fault is sampled in ms
* @property durationSampler A distribution from which the duration of a fault is sampled in s
* @property nohSampler A distribution from which the number of hosts that fault is sampled.
*/
public class SampleBasedFailureModel(
context: CoroutineContext,
clock: InstantSource,
service: ComputeService,
random: RandomGenerator,
private val iatSampler: RealDistribution,
private val durationSampler: RealDistribution,
private val nohSampler: RealDistribution,
) : FailureModel(context, clock, service, random) {
override suspend fun runInjector() {
while (true) {
val iatSample = max(0.0, iatSampler.sample())
val intervalDuration = (iatSample * 3.6e6).roundToLong()

// Handle long overflow
if (clock.millis() + intervalDuration <= 0) {
return
}

delay(intervalDuration)

val numberOfHosts = min(1.0, max(0.0, nohSampler.sample()))
val victims = victimSelector.select(hosts, numberOfHosts)

val durationSample = max(0.0, durationSampler.sample())
val faultDuration = (durationSample * 3.6e6).toLong()
fault.apply(victims, faultDuration)

break
}
}
}
Loading
Loading