Skip to content

Commit

Permalink
feat(queue): update delivery time on runtask
Browse files Browse the repository at this point in the history
  • Loading branch information
emjburns committed Oct 17, 2017
1 parent 47d6323 commit 9d6cfd1
Show file tree
Hide file tree
Showing 14 changed files with 273 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ default <T extends Execution<T>> void restart(
throw new UnsupportedOperationException();
}

default <T extends Execution<T>> void reschedule(
@Nonnull T execution) throws Exception {
throw new UnsupportedOperationException();
}

default <T extends Execution<T>> void unpause(
@Nonnull T execution) throws Exception {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.springframework.scheduling.annotation.Scheduled
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCommands
import redis.clients.jedis.Transaction
import redis.clients.jedis.params.sortedset.ZAddParams
import redis.clients.util.Pool
import java.io.IOException
import java.nio.charset.Charset
Expand Down Expand Up @@ -110,6 +111,19 @@ class RedisQueue(
}
}

override fun reschedule(message: Message, delay: TemporalAmount) {
pool.resource.use { redis ->
val fingerprint = message.hash()
log.debug("Re-scheduling message: $message, fingerprint: $fingerprint to deliver in $delay")
val status: Long = redis.zadd(queueKey, score(delay), fingerprint, ZAddParams.zAddParams().xx())
if (status.toInt() == 1){
fire<MessageRescheduled>(message)
} else {
fire<MessageNotFound>(message)
}
}
}

@Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}")
override fun retry() {
pool.resource.use { redis ->
Expand Down
12 changes: 12 additions & 0 deletions orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ data class RunTask(

constructor(message: TaskLevel, taskType: Class<out Task>) :
this(message.executionType, message.executionId, message.application, message.stageId, message.taskId, taskType)

constructor(source: ExecutionLevel, stageId: String, taskId: String, taskType: Class<out Task>) :
this(source.executionType, source.executionId, source.application, stageId, taskId, taskType)
}

data class StartStage(
Expand Down Expand Up @@ -292,6 +295,15 @@ data class StartExecution(
this(source.javaClass, source.getId(), source.getApplication())
}

data class RescheduleExecution(
override val executionType: Class<out Execution<*>>,
override val executionId: String,
override val application: String
) : Message(), ExecutionLevel {
constructor(source: Execution<*>) :
this(source.javaClass, source.getId(), source.getApplication())
}

data class CompleteExecution(
override val executionType: Class<out Execution<*>>,
override val executionId: String,
Expand Down
10 changes: 10 additions & 0 deletions orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Queue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ interface Queue {
*/
fun push(message: Message, delay: TemporalAmount): Unit

/**
* Update [message] if it exists for immediate delivery.
*/
fun reschedule(message: Message): Unit = reschedule(message, ZERO)

/**
* Update [mesasge] if it exists for delivery after [delay].
*/
fun reschedule(message: Message, delay: TemporalAmount): Unit

/**
* Check for any un-acknowledged messages that are overdue and move them back
* onto the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class QueueExecutionRunner(
override fun <T : Execution<T>> start(execution: T) =
queue.push(StartExecution(execution))

override fun <T : Execution<T>> reschedule(execution: T) {
queue.push(RescheduleExecution(execution))
}

override fun <T : Execution<T>> restart(execution: T, stageId: String) {
queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import org.springframework.stereotype.Component

@Component
class RescheduleExecutionHandler(
override val queue: Queue,
override val repository: ExecutionRepository
) : MessageHandler<RescheduleExecution> {

override val messageType = RescheduleExecution::class.java

@Suppress("UNCHECKED_CAST")
override fun handle(message: RescheduleExecution) {
message.withExecution { execution ->
execution
.getStages()
.filter { it.getStatus() == ExecutionStatus.RUNNING }
.forEach { stage ->
stage.getTasks()
.filter { it.status == ExecutionStatus.RUNNING }
.forEach {
queue.reschedule(RunTask(message,
stage.getId(),
it.id,
Class.forName(it.implementingClass) as Class<out Task>
))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ class InMemoryQueue(
}
}

override fun reschedule(message: Message, delay: TemporalAmount) {
val existed = queue.removeIf { it.payload == message }
if (existed) {
queue.put(Envelope(message, clock.instant().plus(delay), clock))
}
}

@Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}")
override fun retry() {
val now = clock.instant()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ open class AtlasQueueMonitor
is MessageDead -> event.counter.increment()
is MessageDuplicate -> event.counter.increment()
is LockFailed -> event.counter.increment()
is MessageRescheduled -> event.counter.increment()
is MessageNotFound -> event.counter.increment()
}
}

Expand Down Expand Up @@ -155,4 +157,18 @@ open class AtlasQueueMonitor
*/
private val LockFailed.counter: Counter
get() = registry.counter("queue.lock.failed")

/**
* Count of attempted message rescheduling that succeeded (in other words,
* that message existed on the queue).
*/
private val MessageRescheduled.counter: Counter
get() = registry.counter("queue.reschedule.succeeded")

/**
* Count of attempted message rescheduling that failed (in other words,
* that message did not exist on the queue).
*/
private val MessageNotFound.counter: Counter
get() = registry.counter("queue.message.notfound")
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ inline fun <reified E : QueueEvent> MonitorableQueue.fire(message: Message? = nu
LockFailed::class -> LockFailed(this)
MessagePushed::class -> MessagePushed(this, message!!)
MessageDuplicate::class -> MessageDuplicate(this, message!!)
MessageRescheduled::class -> MessageRescheduled(this, message!!)
MessageNotFound::class -> MessageNotFound(this, message!!)
else -> throw IllegalArgumentException("Unknown event type ${E::class}")
}
publisher.publishEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ class MessageRetried(source: MonitorableQueue) : QueueEvent(source)
class MessageDead(source: MonitorableQueue) : QueueEvent(source)
class MessageDuplicate(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload)
class LockFailed(source: MonitorableQueue) : QueueEvent(source)

class MessageRescheduled(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload)
class MessageNotFound(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload)
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class TrafficShapingQueue(

override fun push(message: Message, delay: TemporalAmount) = queueImpl.push(message, delay)

override fun reschedule(message: Message, delay: TemporalAmount) = queueImpl.reschedule(message, delay)

override fun retry() {
queueImpl.retry()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.threeten.extra.Hours
import java.io.Closeable
import java.time.Clock
import java.time.Duration
import java.time.Duration.ZERO

abstract class QueueTest<out Q : Queue>(
createQueue: (Clock, DeadMessageCallback) -> Q,
Expand Down Expand Up @@ -333,6 +334,53 @@ abstract class QueueTest<out Q : Queue>(
}
}

and("the message delivery time is updated") {
val delay = Hours.of(1)

beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
push(message, delay)
reschedule(message, ZERO)
}
}

afterGroup(::stopQueue)
afterGroup(::resetMocks)

on("polling the queue") {
queue!!.poll(callback)
}

it("delivers the message immediately and only once") {
verify(callback).invoke(eq(message), any())
}

it("does not deliver again"){
verifyNoMoreInteractions(callback)
}
}

and("the delivery time for a message that isn't on the queue isn't updated") {
val message2 = StartExecution(Pipeline::class.java, "2", "bar")

beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
reschedule(message2, ZERO)
}
}

afterGroup(::stopQueue)
afterGroup(::resetMocks)

on("polling the queue") {
queue!!.poll(callback)
}

it("there are no messages on the queue"){
verifyNoMoreInteractions(callback)
}
}

and("a different message is pushed before acknowledging the first") {
val newMessage = message.copy(executionId = "2")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import com.nhaarman.mockito_kotlin.*
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.lifecycle.CachingMode
import org.jetbrains.spek.subject.SubjectSpek

object RescheduleExecutionHandlerTest : SubjectSpek<RescheduleExecutionHandler>({

val queue: Queue = mock()
val repository: ExecutionRepository = mock()

subject(CachingMode.GROUP) {
RescheduleExecutionHandler(queue, repository)
}

fun resetMocks() = reset(queue, repository)

describe("reschedule an execution") {
val pipeline = pipeline {
application = "spinnaker"
status = ExecutionStatus.RUNNING
stage {
refId = "1"
status = ExecutionStatus.SUCCEEDED
}
stage {
refId = "2a"
requisiteStageRefIds = listOf("1")
status = ExecutionStatus.RUNNING
task {
id = "4"
status = ExecutionStatus.RUNNING
}
}
stage {
refId = "2b"
requisiteStageRefIds = listOf("1")
status = ExecutionStatus.RUNNING
task {
id = "5"
status = ExecutionStatus.RUNNING
}
}
stage {
refId = "3"
requisiteStageRefIds = listOf("2a", "2b")
status = ExecutionStatus.NOT_STARTED
}
}
val message = RescheduleExecution(Pipeline::class.java, pipeline.id, pipeline.application)

beforeGroup {
whenever(repository.retrievePipeline(pipeline.id)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

@Suppress("UNCHECKED_CAST")
it("it updates the time for each running task") {
val stage2a = pipeline.stageByRef("2a")
val stage2b = pipeline.stageByRef("2b")
val task4 = stage2a.taskById("4")
val task5 = stage2b.taskById("5")

verify(queue).reschedule(RunTask(message, stage2a.id, task4.id, Class.forName(task4.implementingClass) as Class<out Task>))
verify(queue).reschedule(RunTask(message, stage2b.id, task5.id, Class.forName(task5.implementingClass) as Class<out Task>))
verifyNoMoreInteractions(queue)
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class TaskController {
@ResponseStatus(HttpStatus.ACCEPTED)
void pause(@PathVariable String id) {
executionRepository.pause(id, AuthenticatedRequest.getSpinnakerUser().orElse("anonymous"))
def pipeline = executionRepository.retrievePipeline(id)
executionRunner.reschedule(pipeline)
}

@PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'WRITE')")
Expand Down

0 comments on commit 9d6cfd1

Please sign in to comment.