From 3821bc8f89e5d1e83e20b46527a7c3564d82a371 Mon Sep 17 00:00:00 2001 From: Andrew Kaplanov Date: Wed, 24 Jan 2018 15:56:10 +0500 Subject: [PATCH] Add LinkedLogics --- .../contrib/linkedlogics/LinkedLogics.scala | 109 ++++ .../linkedlogics/impl/LogicsCollector.scala | 58 ++ .../linkedlogics/impl/SlotsCollector.scala | 155 +++++ .../impl/slots/EnteringSlot.scala | 63 ++ .../linkedlogics/impl/slots/InputSlot.scala | 40 ++ .../linkedlogics/impl/slots/OutputSlot.scala | 191 +++++++ .../linkedlogics/impl/slots/TimerSlot.scala | 37 ++ .../linkedlogics/logics/EnteringLogic.scala | 151 +++++ .../linkedlogics/logics/InputLogic.scala | 21 + .../linkedlogics/logics/LogicLink.scala | 26 + .../linkedlogics/logics/TimerLogic.scala | 66 +++ .../linkedlogics/LinkedLogicTest.scala | 538 ++++++++++++++++++ .../linkedlogics/TimerLinkedLogicTest.scala | 166 ++++++ 13 files changed, 1621 insertions(+) create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/LinkedLogics.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/LogicsCollector.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/SlotsCollector.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/EnteringSlot.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/InputSlot.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/OutputSlot.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/TimerSlot.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/EnteringLogic.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/InputLogic.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/LogicLink.scala create mode 100644 contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/TimerLogic.scala create mode 100644 contrib/src/test/scala/akka/stream/contrib/linkedlogics/LinkedLogicTest.scala create mode 100644 contrib/src/test/scala/akka/stream/contrib/linkedlogics/TimerLinkedLogicTest.scala diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/LinkedLogics.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/LinkedLogics.scala new file mode 100644 index 0000000..5fab14b --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/LinkedLogics.scala @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } +import akka.stream.stage.linkedlogic.impl.SlotsCollector.LogicInterface +import akka.stream.stage.linkedlogic.impl.TimerSlotsCollector.TimerLogicInterface +import akka.stream.stage.linkedlogic.impl.{ TimerLogicsCollector, _ } +import akka.stream.stage.linkedlogic.logics.{ InputLogic, OnceScheduledTimerLogic, PeriodicallyTimerLogic, TimerLogic } +import akka.stream.{ Inlet, Outlet, Shape } + +import scala.concurrent.duration.FiniteDuration + +/** + * Represents processing logics with defined links between inlets and outlets. + * We must to define the logic for all inlets in the shape before LinkedLogics started. + */ +final class LinkedLogics(shape: Shape)(implicit system: ActorSystem) extends GraphStageLogic(shape) { self ⇒ + private val log = Logging(system, getClass) + + private val slots = new SlotsCollector(shape, new LogicInterface { + override def cancel[T](in: Inlet[T]): Unit = self.cancel(in) + override def setHandler(in: Inlet[_], handler: InHandler): Unit = self.setHandler(in, handler) + override def setHandler(out: Outlet[_], handler: OutHandler): Unit = self.setHandler(out, handler) + override def isAvailable[T](in: Inlet[T]): Boolean = self.isAvailable(in) + override def isAvailable[T](out: Outlet[T]): Boolean = self.isAvailable(out) + override def hasBeenPulled[T](in: Inlet[T]): Boolean = self.hasBeenPulled(in) + override def push[T](out: Outlet[T], elem: T): Unit = self.push(out, elem) + override def tryPull[T](in: Inlet[T]): Unit = self.tryPull(in) + override def completeStage(): Unit = self.completeStage() + override def grab[T](in: Inlet[T]): T = self.grab(in) + }) + + private val logics = new InputLogicsCollector() + + override def preStart(): Unit = { + logics.start(slots) + super.preStart() + } + + /** + * Adds logic of processing elements from the inlet. + */ + def add[In](logic: InputLogic[In]): InputLogic[In] = { + logics.addInputLogic(logic) + logic + } +} + +/** + * Represents processing logics with predefined links between inlets, timers and outlets. + */ +final class TimerLinkedLogics(shape: Shape)(implicit system: ActorSystem) extends TimerGraphStageLogic(shape) { self ⇒ + private val log = Logging(system, getClass) + + private val slots = new TimerSlotsCollector(shape, new TimerLogicInterface { + override def cancel[T](in: Inlet[T]): Unit = self.cancel(in) + override def setHandler(in: Inlet[_], handler: InHandler): Unit = self.setHandler(in, handler) + override def setHandler(out: Outlet[_], handler: OutHandler): Unit = self.setHandler(out, handler) + override def isAvailable[T](in: Inlet[T]): Boolean = self.isAvailable(in) + override def isAvailable[T](out: Outlet[T]): Boolean = self.isAvailable(out) + override def hasBeenPulled[T](in: Inlet[T]): Boolean = self.hasBeenPulled(in) + override def push[T](out: Outlet[T], elem: T): Unit = self.push(out, elem) + override def tryPull[T](in: Inlet[T]): Unit = self.pull(in) + override def completeStage(): Unit = self.completeStage() + override def grab[T](in: Inlet[T]): T = self.grab(in) + override def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit = self.scheduleOnce(timerKey, delay) + override def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit = self.schedulePeriodically(timerKey, interval) + override def schedulePeriodicallyWithInitialDelay(timerKey: Any, initialDelay: FiniteDuration, interval: FiniteDuration): Unit = + self.schedulePeriodicallyWithInitialDelay(timerKey, initialDelay, interval) + override def cancelTimer(timerKey: Any): Unit = self.cancelTimer(timerKey) + }) + + private val logics = new TimerLogicsCollector() + + override def preStart(): Unit = { + logics.start(slots) + super.preStart() + } + + /** + * Adds logic of processing elements from inlet. + */ + def add[In](logic: InputLogic[In]): InputLogic[In] = { + logics.addInputLogic(logic) + if (logics.isStarted()) { + logic.start(slots) + } + logic + } + + /** + * Adds logic of processing elements from the timer. + */ + def add[Logic <: TimerLogic](logic: Logic): Logic = { + logics.addTimerLogic(logic) + if (logics.isStarted()) { + logic.start(slots) + } + logic + } + + protected final override def onTimer(timerKey: Any): Unit = { + slots.onTimer(timerKey) + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/LogicsCollector.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/LogicsCollector.scala new file mode 100644 index 0000000..f1bd681 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/LogicsCollector.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.impl + +import akka.stream.{ Inlet, Shape } +import akka.stream.stage.linkedlogic.logics.{ InputLogic, TimerLogic } + +/** + * Collector of input logics. + * Input logics can be added initially or in progress. + */ +private[linkedlogic] class InputLogicsCollector { + private var inputLogics = Map.empty[Inlet[_], InputLogic[_]] + private var started = false + + def isStarted() = started + + def start(slots: SlotsCollector): Unit = { + slots.shape.inlets.foreach(inlet ⇒ { + if (!inputLogics.isDefinedAt(inlet)) { + sys.error(s"Logic of inlet ${inlet} is not defined") + } + }) + inputLogics.values.foreach(_.start(slots)) + started = true + } + + def addInputLogic[In](logic: InputLogic[In]): Unit = { + if (started) { + sys.error("Input logic can not be added after start") + } + if (inputLogics.isDefinedAt(logic.inlet)) { + sys.error(s"Logic of ${logic.inlet} is already defined") + } + inputLogics += (logic.inlet -> logic) + logic.setOnTerminateHandler(() ⇒ inputLogics -= logic.inlet) + } +} + +/** + * Collector of input and timer logics. + * Timer logics can be added initially or in progress. + * It is possible also to remove timer logic. + */ +private[linkedlogic] final class TimerLogicsCollector extends InputLogicsCollector { + private var timerLogics = Set.empty[TimerLogic] + + def start(slots: TimerSlotsCollector): Unit = { + timerLogics.foreach(_.start(slots)) + super.start(slots) + } + + def addTimerLogic[Logic <: TimerLogic](logic: Logic): Unit = { + timerLogics += logic + logic.setOnTerminateHandler(() ⇒ timerLogics -= logic) + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/SlotsCollector.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/SlotsCollector.scala new file mode 100644 index 0000000..defac9c --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/SlotsCollector.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.impl + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.stream.stage.linkedlogic.impl.SlotsCollector.LogicInterface +import akka.stream.stage.linkedlogic.impl.TimerSlotsCollector.TimerLogicInterface +import akka.stream.stage.linkedlogic.impl.slots.{ EnteringSlot, InputSlot, OutputSlot, TimerSlot } +import akka.stream.stage.{ InHandler, OutHandler } +import akka.stream.{ Inlet, Outlet, Shape } + +import scala.concurrent.duration.FiniteDuration + +/** + * Represents the collection of entering and output slots. + * Slots for the inlets and outlets are created initially. + */ +private[linkedlogic] class SlotsCollector(val shape: Shape, logic: LogicInterface)(implicit system: ActorSystem) { + private val log = Logging(system, getClass) + + protected var enteringSlotIdSequence = 0 + protected var outputSlotIdSequence = 0 + + protected var enteringSlots: Map[Int, EnteringSlot] = makeEnteringSlots() + protected var outputSlots: Map[Int, OutputSlot[_]] = makeOutputSlots() + + private def makeEnteringSlots() = { + var inSlots = Map.empty[Int, EnteringSlot] + for (index ← 0 until shape.getInlets.size()) { + val inlet = shape.getInlets.get(index) + val slot = new InputSlot[Any](enteringSlotIdSequence, () ⇒ logic.isAvailable(inlet), () ⇒ logic.grab(inlet), + () ⇒ if (!logic.hasBeenPulled(inlet)) logic.tryPull(inlet)) + enteringSlotIdSequence += 1 + logic.setHandler(inlet, slot.makeHandler()) + inSlots += (slot.id -> slot) + } + inSlots + } + + private def makeOutputSlots() = { + var outSlots = Map.empty[Int, OutputSlot[_]] + for (index ← 0 until shape.getOutlets.size()) { + val outlet = shape.getOutlets.get(index).asInstanceOf[Outlet[Any]] + val slot = new OutputSlot[Any](outputSlotIdSequence, outlet.toString(), () ⇒ logic.isAvailable(outlet), p ⇒ logic.push(outlet, p), + (inboundIndex, available) ⇒ enteringSlots(inboundIndex).notifyAvailable(index, available), () ⇒ logic.completeStage()) + outputSlotIdSequence += 1 + logic.setHandler(outlet, slot.makeHandler()) + outSlots += (slot.id -> slot) + } + outSlots + } + + def getEnteringSlot[In](slotId: Int) = enteringSlots.get(slotId) + + def getOutputSlot[Out](slotId: Int) = outputSlots.get(slotId) + + def getInputSlot[In](inlet: Inlet[In]) = { + val index = shape.getInlets.indexOf(inlet) + if (index == -1) { + sys.error(s"No inlet ${inlet} in shape, existing ${shape.getInlets}") + } + enteringSlots.get(index) match { + case Some(inputSlot: InputSlot[_]) ⇒ + inputSlot.asInstanceOf[InputSlot[In]] + case Some(_) ⇒ + sys.error(s"Slot ${index} is not the input") + case None ⇒ + sys.error(s"No input slot ${index}") + } + } + + def getOutputSlot[Out](outlet: Outlet[Out]) = { + val index = shape.getOutlets.indexOf(outlet) + if (index == -1) { + sys.error(s"No outlet ${outlet} in shape, existing ${shape.getOutlets}") + } + outputSlots.get(index) match { + case Some(outoutSlot) ⇒ + outputSlots(index).asInstanceOf[OutputSlot[Out]] + case None ⇒ + sys.error(s"No output slot ${index}") + } + } + + def removeEnteringSlot(enteringSlotId: Int) = { + for (timerSlot ← enteringSlots.get(enteringSlotId)) { + for (outSlotId ← timerSlot.getLinks()) { + for (outSlot ← getOutputSlot(outSlotId)) { + outSlot.unlink(timerSlot.id) + } + } + enteringSlots -= enteringSlotId + } + } +} + +object SlotsCollector { + trait LogicInterface { + def setHandler(in: Inlet[_], handler: InHandler): Unit + def isAvailable[T](in: Inlet[T]): Boolean + def hasBeenPulled[T](in: Inlet[T]): Boolean + def tryPull[T](in: Inlet[T]): Unit + def grab[T](in: Inlet[T]): T + def cancel[T](in: Inlet[T]): Unit + def setHandler(out: Outlet[_], handler: OutHandler): Unit + def isAvailable[T](out: Outlet[T]): Boolean + def push[T](out: Outlet[T], elem: T): Unit + def completeStage(): Unit + } +} + +/** + * Represents the collection of input, output and timer slots. + * Timer slots may be added and removed in progress. + */ +private[linkedlogic] class TimerSlotsCollector(shape: Shape, logic: TimerLogicInterface)(implicit system: ActorSystem) extends SlotsCollector(shape, logic) { + private[linkedlogic] def addOnceScheduledTimerSlot(delay: FiniteDuration): TimerSlot = { + var timerSlotId = enteringSlotIdSequence; enteringSlotIdSequence += 1 + val timerSlot = new TimerSlot(timerSlotId, () ⇒ logic.scheduleOnce(timerSlotId, delay), () ⇒ logic.cancelTimer(timerSlotId)) + enteringSlots += (timerSlot.id -> timerSlot) + timerSlot + } + + def addPeriodicallyTimerSlot(interval: FiniteDuration, initialDelay: Option[FiniteDuration] = None): TimerSlot = { + var timerSlotId = enteringSlotIdSequence; enteringSlotIdSequence += 1 + val timerSlot = new TimerSlot(timerSlotId, () ⇒ initialDelay match { + case Some(initialDelay) ⇒ logic.schedulePeriodicallyWithInitialDelay(timerSlotId, initialDelay, interval) + case None ⇒ logic.schedulePeriodically(timerSlotId, interval) + }, () ⇒ logic.cancelTimer(timerSlotId)) + enteringSlots += (timerSlot.id -> timerSlot) + timerSlot + } + + def onTimer(timerKey: Any): Unit = { + val timerSlotId = timerKey.asInstanceOf[Int] + enteringSlots.get(timerSlotId) match { + case Some(timerSlot: TimerSlot) ⇒ + timerSlot.process() + case Some(_) ⇒ + sys.error(s"Slot ${timerSlotId} is not the timer") + case None ⇒ + } + } +} + +object TimerSlotsCollector { + trait TimerLogicInterface extends LogicInterface { + def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit + def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit + def schedulePeriodicallyWithInitialDelay(timerKey: Any, initialDelay: FiniteDuration, interval: FiniteDuration): Unit + def cancelTimer(timerKey: Any): Unit + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/EnteringSlot.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/EnteringSlot.scala new file mode 100644 index 0000000..82ed8c0 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/EnteringSlot.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.impl.slots + +/** + * Base class of logic slot for an inlet or a timer. + */ +private[linkedlogic] abstract class EnteringSlot(val id: Int) { + private var links = Set.empty[Int] + private var availableLinks = Set.empty[Int] + private var onTerminateHandler = Option.empty[() ⇒ Unit] + + protected def isPending(): Boolean + protected def process(): Unit + protected def requestNext(): Unit + + def getLinks() = links + + def setOnTerminateHandler(onTerminateHandler: () ⇒ Unit): Unit = { + this.onTerminateHandler = Some(onTerminateHandler) + } + + def link(outputSlotId: Int): Unit = { + if (links.contains(outputSlotId)) { + sys.error(s"Link to the ${outputSlotId} already exists") + } + links += outputSlotId + } + + def unlink(outputSlotId: Int): Unit = { + links -= outputSlotId + availableLinks -= outputSlotId + handleIfReadyToProcess() + } + + def notifyAvailable(outputSlotId: Int, available: Boolean): Unit = { + if (available) { + availableLinks += outputSlotId + handleIfReadyToProcess() + } else { + availableLinks -= outputSlotId + } + } + + def isReadyToProcess(): Boolean = { + links.size == availableLinks.size + } + + def close(): Unit = { + onTerminateHandler.foreach(_.apply()) + } + + private def handleIfReadyToProcess(): Unit = { + if (isReadyToProcess()) { + if (isPending()) { + process() + } else { + requestNext() + } + } + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/InputSlot.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/InputSlot.scala new file mode 100644 index 0000000..518a0fb --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/InputSlot.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.impl.slots + +import akka.stream.stage.InHandler + +/** + * Logic slot for the inlet. + */ +private[linkedlogic] final class InputSlot[In](id: Int, isAvailable: () ⇒ Boolean, grab: () ⇒ In, tryPull: () ⇒ Unit) extends EnteringSlot(id) { + private var inputHandler: (In) ⇒ Unit = null + + override protected def isPending(): Boolean = isAvailable() + + override protected def requestNext(): Unit = tryPull() + + def start(inputHandler: (In) ⇒ Unit): Unit = { + this.inputHandler = inputHandler + if (getLinks().isEmpty) { + requestNext() + } + } + + def makeHandler() = new InHandler() { + def onPush(): Unit = { + if (isReadyToProcess()) { + process() + } + } + } + + def process(): Unit = { + val packet = grab() + inputHandler(packet) + if (isReadyToProcess()) { + requestNext() + } + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/OutputSlot.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/OutputSlot.scala new file mode 100644 index 0000000..931ba20 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/OutputSlot.scala @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.impl.slots + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.stream.OverflowStrategy +import akka.stream.stage.OutHandler +import akka.stream.stage.linkedlogic.impl.slots.OutputSlot.BufferInfo + +import scala.collection.mutable + +/** + * Logic slot for the outlet. + */ +private[linkedlogic] final class OutputSlot[Out](val id: Int, name: String, isAvailable: () ⇒ Boolean, push: (Out) ⇒ Unit, + notifyAvailable: (Int, Boolean) ⇒ Unit, completeStage: () ⇒ Unit)(implicit system: ActorSystem) { + private case class Link(info: Option[BufferInfo]) { + val queue = mutable.Queue.empty[(Long, Out)] + var removed = false + } + + private val log = Logging(system, getClass) + private var links = Map.empty[Int, Link] + private var availableLinks = Set.empty[Int] + private var enqueueSequence: Long = 0L + + def link(enteringSlotId: Int, bufferInfo: Option[BufferInfo]): Unit = { + if (links.get(enteringSlotId).isDefined) { + sys.error(s"Link from the entering slot ${enteringSlotId} to the output slot ${id} is already defined") + } + links += (enteringSlotId -> Link(bufferInfo)) + notifyAvailable(enteringSlotId, true) + } + + def unlink(enteringSlotId: Int): Unit = { + for (link ← links.get(enteringSlotId)) { + if (link.queue.isEmpty) { + availableLinks -= enteringSlotId + links -= enteringSlotId + } else { + link.removed = true + } + } + } + + def push(enteringSlotId: Int, packet: Out): Unit = { + val link = getLink(enteringSlotId) + val available = if (isAvailable()) { + assert(link.queue.isEmpty) + push(packet) + true + } else { + link.info match { + case Some(info) ⇒ + if (info.overflowStrategy == OverflowStrategy.fail) { + if (link.queue.size >= info.maxSize) { + log.error(s"Output queue to ${name} overflowed. Current size is ${link.queue.size}. Complete stage.") + completeStage() + true + } else { + enqueue(enteringSlotId, packet) + true + } + } else if (info.overflowStrategy == OverflowStrategy.backpressure) { + enqueue(enteringSlotId, packet) + if (link.queue.size >= info.maxSize) { + val portion = if (info.maxSize != 0) info.maxSize else 10 + if (link.queue.size % portion == 0) { + log.error(s"Output queue to ${name} is overflowed. Current size is ${link.queue.size}. Activate back pressure.") + } + false + } else { + true + } + } else if (info.overflowStrategy == OverflowStrategy.dropBuffer) { + if (link.queue.size >= info.maxSize) { + log.error(s"Output queue to ${name} overflowed. Clear it.") + link.queue.clear() + } + enqueue(enteringSlotId, packet) + true + } else if (info.overflowStrategy == OverflowStrategy.dropHead) { + while (link.queue.size >= info.maxSize) { + link.queue.dequeue() + } + enqueue(enteringSlotId, packet) + true + } else if (info.overflowStrategy == OverflowStrategy.dropTail) { + for (i ← 0 until link.queue.size) { + val e = link.queue.dequeue() + if (i != link.queue.size - 1) { + link.queue.enqueue(e) + } + } + enqueue(enteringSlotId, packet) + true + } else if (info.overflowStrategy == OverflowStrategy.dropNew) { + true + } else { + sys.error(s"Invalid overflow strategy ${info.overflowStrategy}") + } + case None ⇒ + enqueue(enteringSlotId, packet) + false + } + } + if (!available) { + notifyAvailable(enteringSlotId, false) + } + } + + def makeHandler() = { + new OutHandler() { + @throws[Exception](classOf[Exception]) + override def onPull(): Unit = { + for (packet ← dequeue()) { + push(packet) + } + } + } + } + + private def getLink(enteringSlotId: Int) = { + links.get(enteringSlotId) match { + case Some(link) ⇒ link + case None ⇒ sys.error(s"No link to entering slot ${enteringSlotId}") + } + } + + private def enqueue(enteringSlotId: Int, packet: Out): Unit = { + val link = getLink(enteringSlotId) + link.queue.enqueue((enqueueSequence, packet)) + enqueueSequence += 1 + if (link.queue.size == 1) { + availableLinks += enteringSlotId + } + if (link.queue.size >= 100 && link.queue.size % 10 == 0) { + log.warning(s"Output queue to ${name} size is ${link.queue.size}.") + } + } + + private def dequeue(): Option[Out] = { + var frontIndex = 0 + var frontLink = Option.empty[Link] + availableLinks.foreach { + inboundIndex ⇒ + { + val buffer = getLink(inboundIndex) + if (frontLink.isEmpty || buffer.queue.front._1 < frontLink.get.queue.front._1) { + frontIndex = inboundIndex + frontLink = Some(buffer) + } + } + } + frontLink match { + case Some(link) ⇒ + val out = link.queue.dequeue() + if (link.queue.isEmpty) { + availableLinks -= frontIndex + } + if (!link.removed) { + link.info match { + case None ⇒ + if (link.queue.isEmpty) { + notifyAvailable(frontIndex, true) + } + case Some(info) if (info.overflowStrategy == OverflowStrategy.backpressure) ⇒ + if (info.maxSize == 0) { + if (link.queue.isEmpty) { + notifyAvailable(frontIndex, true) + } + } else if (link.queue.size == info.maxSize - 1) { + notifyAvailable(frontIndex, true) + } + case _ ⇒ + } + } else if (link.queue.isEmpty) { + links -= frontIndex + } + Some(out._2) + case None ⇒ + None + } + } +} + +object OutputSlot { + case class BufferInfo(maxSize: Int, overflowStrategy: OverflowStrategy) +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/TimerSlot.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/TimerSlot.scala new file mode 100644 index 0000000..b039de7 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/impl/slots/TimerSlot.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.impl.slots + +/** + * Logic slot for the timer. + */ +private[linkedlogic] final class TimerSlot(id: Int, scheduleTimer: () ⇒ Unit, cancelTimer: () ⇒ Unit) extends EnteringSlot(id) { + private var timerHandler: () ⇒ Unit = null + private var pending = false + + override protected def isPending(): Boolean = pending + override protected def requestNext(): Unit = {} + + def setTimerHandler(timerHandler: () ⇒ Unit): Unit = { + this.timerHandler = timerHandler + } + + def start(): Unit = { + scheduleTimer() + } + + def cancel(): Unit = { + cancelTimer() + close() + } + + def process(): Unit = { + if (isReadyToProcess()) { + timerHandler() + pending = false + } else { + pending = true + } + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/EnteringLogic.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/EnteringLogic.scala new file mode 100644 index 0000000..1387836 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/EnteringLogic.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.logics + +import akka.stream.{ Outlet, OverflowStrategy } +import akka.stream.stage.linkedlogic.impl.SlotsCollector +import akka.stream.stage.linkedlogic.impl.slots.OutputSlot.BufferInfo +import akka.stream.stage.linkedlogic.impl.slots.{ EnteringSlot, OutputSlot } + +/** + * Defines links from an inlet or a timer to the outlets. + */ +abstract class EnteringLogic private[linkedlogic] { + case class DefferedLinks(var outputs: Map[Outlet[_], BufferInfo] = Map.empty, var stopper: Boolean = false) + private var deferredLinks = DefferedLinks() + private var slots: SlotsCollector = null + private var enteringSlot: EnteringSlot = null + private var callbacks = Map.empty[Outlet[_], (_) ⇒ Unit] + private var onTerminateHandler = Option.empty[() ⇒ Unit] + + /** + * Links to outlet. + * OutputLink is considered ready for processing element when no data in the buffer and outlet available to push. + * + * @param outlet - outlet to link with. + */ + final def linkOutput[Out](outlet: Outlet[Out]): OutputLink[Out] = { + linkOutput(outlet, 0, OverflowStrategy.backpressure) + } + + /** + * Links to the outlet. + * OutputLink is considered ready for processing element when overflow strategy allows to append new element to the buffer. + * + * @param outlet outlet to link with. + * @param bufferSize size of the buffer to control pulling elements from the input or processing events from the timer. + * @param overflowStrategy strategy of back-pressure when the buffer is overflowed. + */ + final def linkOutput[Out](outlet: Outlet[Out], bufferSize: Int, overflowStrategy: OverflowStrategy): OutputLink[Out] = { + linkOutput(outlet, BufferInfo(bufferSize, overflowStrategy)) + } + + /** + * Links array of the outlets. + * OutputLinks are considered ready for processing element when no data in the all the buffers and all the outlets available to push. + * + * @param outlets - outlets to link with. + */ + final def linkOutputs[Out](outlets: Seq[Outlet[Out]]): Seq[OutputLink[Out]] = { + outlets.map(linkOutput(_, BufferInfo(0, OverflowStrategy.backpressure))) + } + + /** + * Links array of the outlets. + * OutputLinks are considered ready for processing the element when overflow strategy allows to append new element to the buffers. + * + * @param outlets - outlets to link with. + * @param bufferSize size of buffer to control pulling elements from the the input or processing events from the timer. + * @param overflowStrategy strategy of back-pressure when the buffer is overflowed. + */ + final def linkOutputs[Out](outlets: Seq[Outlet[Out]], bufferSize: Int, overflowStrategy: OverflowStrategy): Seq[OutputLink[Out]] = { + outlets.map(linkOutput(_, BufferInfo(bufferSize, overflowStrategy))) + } + + final def linkStopper(): StopperLink = { + if (deferredLinks != null) { + deferredLinks.stopper = true + } else { + startStopper() + } + new StopperLink() { + override def remove(): Unit = { + unlinkStopper() + } + } + } + + private[linkedlogic] def start(slots: SlotsCollector, enteringSlot: EnteringSlot): Unit = { + this.slots = slots + this.enteringSlot = enteringSlot + deferredLinks.outputs.foreach { + case (outlet, bufferInfo) ⇒ + startOutput(outlet, bufferInfo) + } + if (deferredLinks.stopper) { + startStopper() + } + deferredLinks = null + } + + private[linkedlogic] def stop(): Unit = { + onTerminateHandler.foreach(_.apply()) + } + + private[linkedlogic] def setOnTerminateHandler(onTerminateHandler: () ⇒ Unit): Unit = { + this.onTerminateHandler = Some(onTerminateHandler) + } + + private def getCallback[Out](outlet: Outlet[Out]): (Out) ⇒ Unit = { + callbacks.get(outlet).get.asInstanceOf[(Out) ⇒ Unit] + } + + private def linkOutput[Out](outlet: Outlet[Out], buffer: BufferInfo): OutputLink[Out] = { + if (deferredLinks != null) { + deferredLinks.outputs += (outlet -> buffer) + } else { + startOutput(outlet, buffer) + } + new OutputLink[Out]() { + override def push(element: Out): Unit = { + getCallback(outlet).apply(element) + } + + override def remove(): Unit = { + unlinkOutput(outlet) + } + } + } + + private def startOutput[Out](outlet: Outlet[Out], bufferInfo: BufferInfo): Unit = { + val outputSlot = slots.getOutputSlot(outlet) + enteringSlot.link(outputSlot.id) + outputSlot.link(enteringSlot.id, Some(bufferInfo)) + def push = (packet: Out) ⇒ outputSlot.asInstanceOf[OutputSlot[Out]].push(enteringSlot.id, packet) + callbacks += (outlet -> push) + } + + private def startStopper(): Unit = { + enteringSlot.link(-1) + enteringSlot.notifyAvailable(-1, false) + } + + private def unlinkOutput[Out](outlet: Outlet[Out]): Unit = { + if (deferredLinks != null) { + deferredLinks.outputs -= outlet + } else { + val outputSlot = slots.getOutputSlot(outlet) + enteringSlot.unlink(outputSlot.id) + outputSlot.unlink(enteringSlot.id) + } + } + + private def unlinkStopper(): Unit = { + if (deferredLinks != null) { + deferredLinks.stopper = false + } else { + enteringSlot.unlink(-1) + } + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/InputLogic.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/InputLogic.scala new file mode 100644 index 0000000..cd66e5a --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/InputLogic.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.logics + +import akka.stream.Inlet +import akka.stream.stage.linkedlogic.impl.SlotsCollector +import akka.stream.stage.linkedlogic.impl.slots.InputSlot + +/** + * Logic of processing elements from the the inlet. + */ +abstract class InputLogic[In] private[linkedlogic] (val inlet: Inlet[In]) extends EnteringLogic { + private[linkedlogic] def inputHandler(data: In): Unit + + private[linkedlogic] def start(slots: SlotsCollector): Unit = { + val inputSlot = slots.getInputSlot(inlet) + inputSlot.start(inputHandler) + start(slots, inputSlot) + } +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/LogicLink.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/LogicLink.scala new file mode 100644 index 0000000..e3b291e --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/LogicLink.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.logics + +/** Link between an input and an output. */ +sealed trait LogicLink { + /** Terminates the link. */ + def remove(): Unit +} + +/** Link to the outlet. */ +trait OutputLink[Out] extends LogicLink { + /** + * Pushes the element to outlet. + * If the outlet is not available, adds the element to the buffer. + */ + def push(element: Out): Unit +} + +/** + * Link to the stopper. + * Leads to the back-pressure until it is removed. + */ +trait StopperLink extends LogicLink { +} diff --git a/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/TimerLogic.scala b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/TimerLogic.scala new file mode 100644 index 0000000..8cecd17 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/linkedlogics/logics/TimerLogic.scala @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.stream.stage.linkedlogic.logics + +import akka.stream.stage.linkedlogic.impl.slots.TimerSlot +import akka.stream.stage.linkedlogic.impl.{ SlotsCollector, TimerSlotsCollector } + +import scala.concurrent.duration.FiniteDuration + +/** + * Logic of processing elements from the timer. + */ +abstract class TimerLogic private[linkedlogic] () extends EnteringLogic() { + private var timerSlot: TimerSlot = null + + private[linkedlogic] def start(slots: TimerSlotsCollector): Unit = { + timerSlot = makeSlot(slots) + timerSlot.start() + start(slots, timerSlot) + } + + final def cancel(): Unit = { + timerSlot.cancel() + } + + private[linkedlogic] def makeSlot(slots: TimerSlotsCollector): TimerSlot + + protected[linkedlogic] def timerHandler(): Unit +} + +/** + * Once scheduled timer. + * Time for the start of processing depends on the readiness of the links. + * So, if the links are not ready to accept a new element, timer processing call may be delayed. + * + * @param delay minimal delay before timer processing started. + */ +abstract class OnceScheduledTimerLogic private[linkedlogic] (delay: FiniteDuration) extends TimerLogic() { + override private[linkedlogic] final def makeSlot(slots: TimerSlotsCollector): TimerSlot = { + val timerSlot = slots.addOnceScheduledTimerSlot(delay) + timerSlot.setTimerHandler(() ⇒ { + timerHandler() + timerSlot.close() + }) + timerSlot.setOnTerminateHandler(() ⇒ { slots.removeEnteringSlot(timerSlot.id) }) + timerSlot + } +} + +/** + * Periodically executed timer. + * Time for the start of processing depends on the readiness of the links. + * So, if the links are not ready to accept a new element, timer processing call may be delayed. + * + * @param period minimal period between timer processing. + * @param initialDelay initial delay before first timer call. + */ +abstract class PeriodicallyTimerLogic private[linkedlogic] (period: FiniteDuration, initialDelay: Option[FiniteDuration] = None) extends TimerLogic() { + override private[linkedlogic] final def makeSlot(slots: TimerSlotsCollector): TimerSlot = { + val timerSlot = slots.addPeriodicallyTimerSlot(period, initialDelay) + timerSlot.setTimerHandler(timerHandler) + timerSlot.setOnTerminateHandler(() ⇒ { slots.removeEnteringSlot(timerSlot.id) }) + timerSlot + } +} diff --git a/contrib/src/test/scala/akka/stream/contrib/linkedlogics/LinkedLogicTest.scala b/contrib/src/test/scala/akka/stream/contrib/linkedlogics/LinkedLogicTest.scala new file mode 100644 index 0000000..45fe192 --- /dev/null +++ b/contrib/src/test/scala/akka/stream/contrib/linkedlogics/LinkedLogicTest.scala @@ -0,0 +1,538 @@ +package akka.stream.stage.linkedlogic + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl.{ GraphDSL, RunnableGraph } +import akka.stream.stage.linkedlogic.logics.{ InputLogic, OutputLink } +import akka.stream.stage.{ GraphStage, GraphStageLogic } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.testkit.{ ImplicitSender, TestKit } +import org.scalatest.{ BeforeAndAfterAll, FlatSpecLike, Matchers } + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +class LinkedLogicTest(sys: ActorSystem) extends TestKit(sys) + with ImplicitSender with Matchers with FlatSpecLike with BeforeAndAfterAll { + + def this() = this(ActorSystem("LinkedLogicTest")) + + implicit val materializer = ActorMaterializer( + ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + ) + + val log = sys.log + + override def afterAll: Unit = { + sys.terminate() + Await.result(sys.whenTerminated, FiniteDuration.apply(10, TimeUnit.SECONDS)) + } + + val maxBufferSize = 10 + + behavior of "LinkedLogic" + + it should "pass through hard link" in { + val (in, out) = materializeFlow(new FlowStage(None)) + + out.ensureSubscription() + in.ensureSubscription() + + in.sendNext("dummy1") + out.requestNext("dummy1") + in.sendNext("dummy2") + out.requestNext("dummy2") + + in.expectRequest() + in.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "back pressure on hard link" in { + val (in, out) = materializeFlow(new FlowStage(None)) + + out.ensureSubscription() + in.ensureSubscription() + + in.expectRequest() + in.sendNext("dummy1") + + in.expectRequest() + in.sendNext("dummy2") + + in.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "buffer data on buffered link" in { + val (in, out) = materializeFlow(new FlowStage(Some(BufferInfo(maxBufferSize, OverflowStrategy.backpressure)))) + + out.ensureSubscription() + in.ensureSubscription() + + for (i ← 0 until maxBufferSize) { + in.expectRequest() + in.sendNext(i.toString) + } + + for (i ← 0 until maxBufferSize) { + out.requestNext(i.toString) + } + } + + it should "back pressure on buffered link when buffer is full" in { + val (in, out) = materializeFlow(new FlowStage(Some(BufferInfo(maxBufferSize, OverflowStrategy.backpressure)))) + + out.ensureSubscription() + in.ensureSubscription() + + for (i ← 0 until maxBufferSize) { + in.sendNext(i.toString) + } + + in.expectRequest() + in.sendNext("dummy") + + in.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "complete flow when buffer overflow with fail strategy" in { + val (in, out) = materializeFlow(new FlowStage(Some(BufferInfo(maxBufferSize, OverflowStrategy.fail)))) + + out.ensureSubscription() + + for (i ← 0 until maxBufferSize) { + in.sendNext(i.toString) + } + out.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + + in.expectRequest() + in.sendNext("dummy") + + out.expectComplete() + } + + it should "add/remove link after start of stage" in { + val (in, out1, out2) = materializeFanOut(new TemporaryLinkStage()) + + in.ensureSubscription() + out1.ensureSubscription() + out2.ensureSubscription() + + in.sendNext("dummy1") + out1.requestNext("dummy1") + out2.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + + in.sendNext("add") + + in.sendNext("dummy2") + in.sendNext("dummy3") + in.sendNext("dummy4") + in.sendNext("dummy5") + + in.sendNext("remove") + + out1.requestNext("dummy2") + out2.requestNext("dummy2") + out1.requestNext("dummy3") + out2.requestNext("dummy3") + out1.requestNext("dummy4") + out2.requestNext("dummy4") + out1.requestNext("dummy5") + out2.requestNext("dummy5") + + in.sendNext("dummy6") + out1.requestNext("dummy6") + + out2.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + out1.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "add/remove stopper" in { + val (in1, in2, out) = materializeFanIn(new StopperLinkStage()) + + in1.ensureSubscription() + in2.ensureSubscription() + out.ensureSubscription() + + in1.sendNext("dummy1") + out.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + + in2.sendNext("start") + out.requestNext("dummy1") + + in2.sendNext("stop") + in1.sendNext("dummy2") + out.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + + in2.sendNext("start") + + out.requestNext("dummy2") + + in1.sendNext("dummy3") + out.requestNext("dummy3") + + out.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "back pressure on fan-out links" in { + val (in, out1, out2) = materializeFanOut(new FanOutStage(None, None)) + + in.ensureSubscription() + out1.ensureSubscription() + out2.ensureSubscription() + + in.expectRequest() + in.sendNext("dummy1") + + in.expectRequest() + in.sendNext("dummy2") + + in.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "back pressure and buffer on fan-out links" in { + val (in, out1, out2) = materializeFanOut(new FanOutStage(None, Some(BufferInfo(maxBufferSize, OverflowStrategy.backpressure)))) + + in.ensureSubscription() + out1.ensureSubscription() + out2.ensureSubscription() + + in.expectRequest() + in.sendNext("dummy1") + + in.expectRequest() + in.sendNext("dummy2") + + in.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + + out1.requestNext("dummy1") + + in.expectRequest() + in.sendNext("dummy3") + + out1.requestNext("dummy2") + out1.requestNext("dummy3") + out2.requestNext("dummy1") + out2.requestNext("dummy2") + out2.requestNext("dummy3") + } + + it should "back pressure on fan-in links" in { + val (in1, in2, out) = materializeFanIn(new FanInStage(None, None)) + + in1.ensureSubscription() + in2.ensureSubscription() + out.ensureSubscription() + + in1.expectRequest() + in1.sendNext("dummy1") + + in1.expectRequest() + in1.sendNext("dummy2") + + in1.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "back pressure and buffer on fan-in links" in { + val (in1, in2, out) = materializeFanIn(new FanInStage(None, Some(BufferInfo(maxBufferSize, OverflowStrategy.backpressure)))) + + in1.ensureSubscription() + in2.ensureSubscription() + out.ensureSubscription() + + in1.expectRequest() + in1.sendNext("dummy1-1") + + in1.expectRequest() + in1.sendNext("dummy1-2") + + in1.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + + in2.expectRequest() + in2.sendNext("dummy2-1") + + in2.expectRequest() + in2.sendNext("dummy2-2") + + in2.expectRequest() + in2.sendNext("dummy2-3") + + in2.expectRequest() + + out.requestNext("dummy1-1") + out.requestNext("dummy2-1") + out.requestNext("dummy2-2") + out.requestNext("dummy2-3") + out.requestNext("dummy1-2") + } + + it should "order packets on buffered fan-in links" in { + val (in1, in2, out) = materializeFanIn(new FanInStage( + Some(BufferInfo(maxBufferSize, OverflowStrategy.backpressure)), + Some(BufferInfo(maxBufferSize, OverflowStrategy.backpressure)))) + + in1.ensureSubscription() + in2.ensureSubscription() + out.ensureSubscription() + + in1.expectRequest() + in1.sendNext("dummy1-1") + + in2.expectRequest() + in2.sendNext("dummy2-1") + + in2.expectRequest() + in2.sendNext("dummy2-2") + + in1.expectRequest() + in1.sendNext("dummy1-2") + + in1.expectRequest() + in1.sendNext("dummy1-3") + + in2.expectRequest() + in2.sendNext("dummy2-3") + + in1.expectRequest() + in2.expectRequest() + + out.requestNext("dummy1-1") + out.requestNext("dummy2-1") + out.requestNext("dummy2-2") + out.requestNext("dummy1-2") + out.requestNext("dummy1-3") + out.requestNext("dummy2-3") + } + + val flowShape = new FlowShape(Inlet[String]("In"), Outlet[String]("Out")) + val fanOutShape = new FanOutShape(Inlet[String]("In1"), Outlet[String]("Out1"), Outlet[String]("Out2")) + val fanInShape = new FanInShape(Inlet[String]("In1"), Inlet[String]("In2"), Outlet[String]("Out")) + + case class FanOutShape(in1: Inlet[String], out1: Outlet[String], out2: Outlet[String]) extends Shape { + override def inlets: immutable.Seq[Inlet[String]] = immutable.Seq(in1) + override def outlets: immutable.Seq[Outlet[String]] = immutable.Seq(out1, out2) + override def deepCopy(): Shape = new FanOutShape(in1.carbonCopy(), out1.carbonCopy(), out2.carbonCopy()) + } + + case class FanInShape(in1: Inlet[String], in2: Inlet[String], out: Outlet[String]) extends Shape { + override def inlets: immutable.Seq[Inlet[String]] = immutable.Seq(in1, in2) + override def outlets: immutable.Seq[Outlet[String]] = immutable.Seq(out) + override def deepCopy(): Shape = new FanInShape(in1.carbonCopy(), in2.carbonCopy(), out.carbonCopy()) + } + + case class BufferInfo(size: Int, overflowStrategy: OverflowStrategy) + + class FlowStage(buffer: Option[BufferInfo])(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[FlowShape[String, String]] { + override def shape = flowShape + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new LinkedLogics(shape) + logics.add(new InputLogic(shape.in) { + val out = buffer match { + case Some(buffer) ⇒ + linkOutput(shape.out, buffer.size, buffer.overflowStrategy) + case None ⇒ + linkOutput(shape.out) + } + + override def inputHandler(packet: String): Unit = { + out.push(packet) + } + }) + logics + } + } + + class TemporaryLinkStage()(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[FanOutShape] { + override def shape = fanOutShape + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new LinkedLogics(shape) + logics.add(new InputLogic(shape.in1) { + val out1 = linkOutput(shape.out1, 10, OverflowStrategy.backpressure) + var out2 = Option.empty[OutputLink[String]] + + override def inputHandler(packet: String): Unit = { + if (packet == "add") { + out2 = Some(linkOutput(shape.out2, 10, OverflowStrategy.backpressure)) + } else if (packet == "remove") { + out2.foreach(_.remove()) + out2 = None + } else { + out1.push(packet) + out2.foreach(_.push(packet)) + } + } + }) + logics + } + } + + class StopperLinkStage()(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[FanInShape] { + override def shape = fanInShape + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new LinkedLogics(shape) + val logic1 = new InputLogic(shape.in1) { + val out = linkOutput(shape.out) + + override def inputHandler(packet: String): Unit = { + out.push(packet) + } + } + logics.add(logic1) + var stopper = Some(logic1.linkStopper()) + val logic2 = new InputLogic(shape.in2) { + override def inputHandler(packet: String): Unit = { + if (packet == "stop") { + stopper = Some(logic1.linkStopper()) + } else if (packet == "start") { + stopper.foreach(_.remove()) + } + } + } + logics.add(logic2) + logics + } + } + + class FanOutStage(buffer1: Option[BufferInfo], buffer2: Option[BufferInfo])(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[FanOutShape] { + override def shape = fanOutShape + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new LinkedLogics(shape) + if (buffer1 == buffer2) { + logics.add(new InputLogic(shape.in1) { + val outs = buffer1 match { + case Some(buffer) ⇒ + linkOutputs(Seq(shape.out1, shape.out2), buffer.size, buffer.overflowStrategy) + case None ⇒ + linkOutputs(Seq(shape.out1, shape.out2)) + } + + override def inputHandler(packet: String): Unit = { + outs(0).push(packet) + outs(1).push(packet) + } + }) + } else { + logics.add(new InputLogic(shape.in1) { + val out1 = buffer1 match { + case Some(buffer) ⇒ + linkOutput(shape.out1, buffer.size, buffer.overflowStrategy) + case None ⇒ + linkOutput(shape.out1) + } + val out2 = buffer2 match { + case Some(buffer) ⇒ + linkOutput(shape.out2, buffer.size, buffer.overflowStrategy) + case None ⇒ + linkOutput(shape.out2) + } + + override def inputHandler(packet: String): Unit = { + out1.push(packet) + out2.push(packet) + } + }) + } + logics + } + } + + class FanInStage(buffer1: Option[BufferInfo], buffer2: Option[BufferInfo])(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[FanInShape] { + override def shape = fanInShape + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new LinkedLogics(shape) + logics.add(new InputLogic(shape.in1) { + val out = buffer1 match { + case Some(buffer) ⇒ + linkOutput(shape.out, buffer.size, buffer.overflowStrategy) + case None ⇒ + linkOutput(shape.out) + } + + override def inputHandler(packet: String): Unit = { + out.push(packet) + } + }) + logics.add(new InputLogic(shape.in2) { + val out = buffer2 match { + case Some(buffer) ⇒ + linkOutput(shape.out, buffer.size, buffer.overflowStrategy) + case None ⇒ + linkOutput(shape.out) + } + + override def inputHandler(packet: String): Unit = { + out.push(packet) + } + }) + logics + } + } + + def materializeFlow(stage: GraphStage[FlowShape[String, String]]) = { + val in = TestSource.probe[String] + val out = TestSink.probe[String] + + val (in_, out_) = + RunnableGraph.fromGraph(GraphDSL.create(in, out)((_, _)) { implicit builder ⇒ (in, out) ⇒ + import GraphDSL.Implicits._ + + val ports = builder.add(stage) + + in ~> ports.in + ports.out ~> out + + ClosedShape + }).run() + (in_, out_) + } + + def materializeFanOut(stage: GraphStage[FanOutShape]) = { + val in1 = TestSource.probe[String] + val out1 = TestSink.probe[String] + val out2 = TestSink.probe[String] + + val (in1_, out1_, out2_) = + RunnableGraph.fromGraph(GraphDSL.create(in1, out1, out2)((_, _, _)) { implicit builder ⇒ (in1, out1, out2) ⇒ + import GraphDSL.Implicits._ + + val ports = builder.add(stage) + + in1 ~> ports.in1 + + ports.out1 ~> out1 + ports.out2 ~> out2 + + ClosedShape + }).run() + (in1_, out1_, out2_) + } + + def materializeFanIn(stage: GraphStage[FanInShape]) = { + val in1 = TestSource.probe[String] + val in2 = TestSource.probe[String] + val out = TestSink.probe[String] + + val (in1_, in2_, out_) = + RunnableGraph.fromGraph(GraphDSL.create(in1, in2, out)((_, _, _)) { implicit builder ⇒ (in1, in2, out) ⇒ + import GraphDSL.Implicits._ + + val ports = builder.add(stage) + + in1 ~> ports.in1 + in2 ~> ports.in2 + + ports.out ~> out + + ClosedShape + }).run() + (in1_, in2_, out_) + } +} diff --git a/contrib/src/test/scala/akka/stream/contrib/linkedlogics/TimerLinkedLogicTest.scala b/contrib/src/test/scala/akka/stream/contrib/linkedlogics/TimerLinkedLogicTest.scala new file mode 100644 index 0000000..7553fbe --- /dev/null +++ b/contrib/src/test/scala/akka/stream/contrib/linkedlogics/TimerLinkedLogicTest.scala @@ -0,0 +1,166 @@ +package akka.stream.stage.linkedlogic + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl.{ GraphDSL, RunnableGraph } +import akka.stream.stage.linkedlogic.logics.{ InputLogic, OnceScheduledTimerLogic, PeriodicallyTimerLogic } +import akka.stream.stage.{ GraphStage, GraphStageLogic } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.testkit.{ ImplicitSender, TestKit } +import org.scalatest.{ BeforeAndAfterAll, FlatSpecLike, Matchers } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class TimerLinkedLogicTest(sys: ActorSystem) extends TestKit(sys) + with ImplicitSender with Matchers with FlatSpecLike with BeforeAndAfterAll { + + def this() = this(ActorSystem("TimerLinkedLogicTest")) + + implicit val materializer = ActorMaterializer( + ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + ) + + val log = sys.log + + override def afterAll: Unit = { + sys.terminate() + Await.result(sys.whenTerminated, FiniteDuration.apply(10, TimeUnit.SECONDS)) + } + + val maxBufferSize = 10 + + behavior of "TimerLinkedLogic" + + it should "execute periodically timer" in { + val (in1, out1, in2, out2) = materialize(new PeriodicallyTimerStage(FiniteDuration(1, TimeUnit.SECONDS))) + + out1.ensureSubscription() + + out1.request(3) + out2.request(3) + out1.expectNext("0-1") + out2.expectNext("0-2") + out1.expectNext("1-1") + out2.expectNext("1-2") + out1.expectNext("2-1") + out2.expectNext("2-2") + } + + it should "cancel periodically timer" in { + val (in1, out1, in2, out2) = materialize(new PeriodicallyTimerStage(FiniteDuration(1, TimeUnit.SECONDS))) + + out1.ensureSubscription() + + out1.request(3) + out2.request(3) + out1.expectNext("0-1") + out2.expectNext("0-2") + out1.expectNext("1-1") + out2.expectNext("1-2") + in1.sendNext("cancel") + + out1.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + out2.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "do not execute timer with back-pressure condition" in { + val (in1, out1, in2, out2) = materialize(new PeriodicallyTimerStage(FiniteDuration(1, TimeUnit.SECONDS))) + + out1.ensureSubscription() + + out1.requestNext("0-1") + out1.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + it should "add and execute timer during execution of logic" in { + val (in1, out1, in2, out2) = materialize(new EventDrivenCreationOfTimerStage(FiniteDuration(1, TimeUnit.SECONDS))) + + in1.ensureSubscription() + out1.ensureSubscription() + + in1.sendNext("0") + out1.requestNext("0-1") + out2.requestNext("0-2") + out1.expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS)) + } + + class PeriodicallyTimerStage(period: FiniteDuration)(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[BidiShape[String, String, String, String]] { + override val shape = new BidiShape(Inlet[String]("In1"), Outlet[String]("Out1"), Inlet[String]("In2"), Outlet[String]("Out2")) + + var sequence = 0 + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new TimerLinkedLogics(shape) + val timer = logics.add(new PeriodicallyTimerLogic(period) { + val out1 = linkOutput(shape.out1) + val out2 = linkOutput(shape.out2) + + override def timerHandler(): Unit = { + out1.push(sequence.toString + "-1") + out2.push(sequence.toString + "-2") + sequence += 1 + } + }) + logics.add(new InputLogic(shape.in1) { + override def inputHandler(data: String): Unit = { + timer.cancel() + } + }) + logics.add(new InputLogic(shape.in2) { + override def inputHandler(data: String): Unit = {} + }) + logics + } + } + + class EventDrivenCreationOfTimerStage(period: FiniteDuration)(implicit system: ActorSystem, materializer: Materializer) extends GraphStage[BidiShape[String, String, String, String]] { + override val shape = new BidiShape(Inlet[String]("In1"), Outlet[String]("Out1"), Inlet[String]("In2"), Outlet[String]("Out2")) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + val logics = new TimerLinkedLogics(shape) + logics.add(new InputLogic(shape.in1) { + override def inputHandler(data: String): Unit = { + val timerLogic = logics.add(new OnceScheduledTimerLogic(period) { + val out1 = linkOutput(shape.out1) + val out2 = linkOutput(shape.out2) + override def timerHandler(): Unit = { + out1.push(data + "-1") + out2.push(data + "-2") + } + }) + } + }) + logics.add(new InputLogic(shape.in2) { + override def inputHandler(data: String): Unit = {} + }) + logics + } + } + + def materialize(stage: GraphStage[BidiShape[String, String, String, String]]) = { + val in1 = TestSource.probe[String] + val out1 = TestSink.probe[String] + val in2 = TestSource.probe[String] + val out2 = TestSink.probe[String] + + val (out_) = + RunnableGraph.fromGraph(GraphDSL.create(in1, out1, in2, out2)((_, _, _, _)) { implicit builder ⇒ (in1, out1, in2, out2) ⇒ + import GraphDSL.Implicits._ + + val ports = builder.add(stage) + + in1 ~> ports.in1 + ports.out1 ~> out1 + + in2 ~> ports.in2 + ports.out2 ~> out2 + + ClosedShape + }).run() + out_ + } +}