Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative way of defining Graph Stages #121

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.stage.linkedlogic

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }
import akka.stream.stage.linkedlogic.impl.SlotsCollector.LogicInterface
import akka.stream.stage.linkedlogic.impl.TimerSlotsCollector.TimerLogicInterface
import akka.stream.stage.linkedlogic.impl.{ TimerLogicsCollector, _ }
import akka.stream.stage.linkedlogic.logics.{ InputLogic, OnceScheduledTimerLogic, PeriodicallyTimerLogic, TimerLogic }
import akka.stream.{ Inlet, Outlet, Shape }

import scala.concurrent.duration.FiniteDuration

/**
* Represents processing logics with defined links between inlets and outlets.
* We must to define the logic for all inlets in the shape before LinkedLogics started.
*/
final class LinkedLogics(shape: Shape)(implicit system: ActorSystem) extends GraphStageLogic(shape) { self ⇒
private val log = Logging(system, getClass)

private val slots = new SlotsCollector(shape, new LogicInterface {
override def cancel[T](in: Inlet[T]): Unit = self.cancel(in)
override def setHandler(in: Inlet[_], handler: InHandler): Unit = self.setHandler(in, handler)
override def setHandler(out: Outlet[_], handler: OutHandler): Unit = self.setHandler(out, handler)
override def isAvailable[T](in: Inlet[T]): Boolean = self.isAvailable(in)
override def isAvailable[T](out: Outlet[T]): Boolean = self.isAvailable(out)
override def hasBeenPulled[T](in: Inlet[T]): Boolean = self.hasBeenPulled(in)
override def push[T](out: Outlet[T], elem: T): Unit = self.push(out, elem)
override def tryPull[T](in: Inlet[T]): Unit = self.tryPull(in)
override def completeStage(): Unit = self.completeStage()
override def grab[T](in: Inlet[T]): T = self.grab(in)
})

private val logics = new InputLogicsCollector()

override def preStart(): Unit = {
logics.start(slots)
super.preStart()
}

/**
* Adds logic of processing elements from the inlet.
*/
def add[In](logic: InputLogic[In]): InputLogic[In] = {
logics.addInputLogic(logic)
logic
}
}

/**
* Represents processing logics with predefined links between inlets, timers and outlets.
*/
final class TimerLinkedLogics(shape: Shape)(implicit system: ActorSystem) extends TimerGraphStageLogic(shape) { self ⇒
private val log = Logging(system, getClass)

private val slots = new TimerSlotsCollector(shape, new TimerLogicInterface {
override def cancel[T](in: Inlet[T]): Unit = self.cancel(in)
override def setHandler(in: Inlet[_], handler: InHandler): Unit = self.setHandler(in, handler)
override def setHandler(out: Outlet[_], handler: OutHandler): Unit = self.setHandler(out, handler)
override def isAvailable[T](in: Inlet[T]): Boolean = self.isAvailable(in)
override def isAvailable[T](out: Outlet[T]): Boolean = self.isAvailable(out)
override def hasBeenPulled[T](in: Inlet[T]): Boolean = self.hasBeenPulled(in)
override def push[T](out: Outlet[T], elem: T): Unit = self.push(out, elem)
override def tryPull[T](in: Inlet[T]): Unit = self.pull(in)
override def completeStage(): Unit = self.completeStage()
override def grab[T](in: Inlet[T]): T = self.grab(in)
override def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit = self.scheduleOnce(timerKey, delay)
override def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit = self.schedulePeriodically(timerKey, interval)
override def schedulePeriodicallyWithInitialDelay(timerKey: Any, initialDelay: FiniteDuration, interval: FiniteDuration): Unit =
self.schedulePeriodicallyWithInitialDelay(timerKey, initialDelay, interval)
override def cancelTimer(timerKey: Any): Unit = self.cancelTimer(timerKey)
})

private val logics = new TimerLogicsCollector()

override def preStart(): Unit = {
logics.start(slots)
super.preStart()
}

/**
* Adds logic of processing elements from inlet.
*/
def add[In](logic: InputLogic[In]): InputLogic[In] = {
logics.addInputLogic(logic)
if (logics.isStarted()) {
logic.start(slots)
}
logic
}

/**
* Adds logic of processing elements from the timer.
*/
def add[Logic <: TimerLogic](logic: Logic): Logic = {
logics.addTimerLogic(logic)
if (logics.isStarted()) {
logic.start(slots)
}
logic
}

protected final override def onTimer(timerKey: Any): Unit = {
slots.onTimer(timerKey)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.stage.linkedlogic.impl

import akka.stream.{ Inlet, Shape }
import akka.stream.stage.linkedlogic.logics.{ InputLogic, TimerLogic }

/**
* Collector of input logics.
* Input logics can be added initially or in progress.
*/
private[linkedlogic] class InputLogicsCollector {
private var inputLogics = Map.empty[Inlet[_], InputLogic[_]]
private var started = false

def isStarted() = started

def start(slots: SlotsCollector): Unit = {
slots.shape.inlets.foreach(inlet ⇒ {
if (!inputLogics.isDefinedAt(inlet)) {
sys.error(s"Logic of inlet ${inlet} is not defined")
}
})
inputLogics.values.foreach(_.start(slots))
started = true
}

def addInputLogic[In](logic: InputLogic[In]): Unit = {
if (started) {
sys.error("Input logic can not be added after start")
}
if (inputLogics.isDefinedAt(logic.inlet)) {
sys.error(s"Logic of ${logic.inlet} is already defined")
}
inputLogics += (logic.inlet -> logic)
logic.setOnTerminateHandler(() ⇒ inputLogics -= logic.inlet)
}
}

/**
* Collector of input and timer logics.
* Timer logics can be added initially or in progress.
* It is possible also to remove timer logic.
*/
private[linkedlogic] final class TimerLogicsCollector extends InputLogicsCollector {
private var timerLogics = Set.empty[TimerLogic]

def start(slots: TimerSlotsCollector): Unit = {
timerLogics.foreach(_.start(slots))
super.start(slots)
}

def addTimerLogic[Logic <: TimerLogic](logic: Logic): Unit = {
timerLogics += logic
logic.setOnTerminateHandler(() ⇒ timerLogics -= logic)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.stage.linkedlogic.impl

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.stage.linkedlogic.impl.SlotsCollector.LogicInterface
import akka.stream.stage.linkedlogic.impl.TimerSlotsCollector.TimerLogicInterface
import akka.stream.stage.linkedlogic.impl.slots.{ EnteringSlot, InputSlot, OutputSlot, TimerSlot }
import akka.stream.stage.{ InHandler, OutHandler }
import akka.stream.{ Inlet, Outlet, Shape }

import scala.concurrent.duration.FiniteDuration

/**
* Represents the collection of entering and output slots.
* Slots for the inlets and outlets are created initially.
*/
private[linkedlogic] class SlotsCollector(val shape: Shape, logic: LogicInterface)(implicit system: ActorSystem) {
private val log = Logging(system, getClass)

protected var enteringSlotIdSequence = 0
protected var outputSlotIdSequence = 0

protected var enteringSlots: Map[Int, EnteringSlot] = makeEnteringSlots()
protected var outputSlots: Map[Int, OutputSlot[_]] = makeOutputSlots()

private def makeEnteringSlots() = {
var inSlots = Map.empty[Int, EnteringSlot]
for (index ← 0 until shape.getInlets.size()) {
val inlet = shape.getInlets.get(index)
val slot = new InputSlot[Any](enteringSlotIdSequence, () ⇒ logic.isAvailable(inlet), () ⇒ logic.grab(inlet),
() ⇒ if (!logic.hasBeenPulled(inlet)) logic.tryPull(inlet))
enteringSlotIdSequence += 1
logic.setHandler(inlet, slot.makeHandler())
inSlots += (slot.id -> slot)
}
inSlots
}

private def makeOutputSlots() = {
var outSlots = Map.empty[Int, OutputSlot[_]]
for (index ← 0 until shape.getOutlets.size()) {
val outlet = shape.getOutlets.get(index).asInstanceOf[Outlet[Any]]
val slot = new OutputSlot[Any](outputSlotIdSequence, outlet.toString(), () ⇒ logic.isAvailable(outlet), p ⇒ logic.push(outlet, p),
(inboundIndex, available) ⇒ enteringSlots(inboundIndex).notifyAvailable(index, available), () ⇒ logic.completeStage())
outputSlotIdSequence += 1
logic.setHandler(outlet, slot.makeHandler())
outSlots += (slot.id -> slot)
}
outSlots
}

def getEnteringSlot[In](slotId: Int) = enteringSlots.get(slotId)

def getOutputSlot[Out](slotId: Int) = outputSlots.get(slotId)

def getInputSlot[In](inlet: Inlet[In]) = {
val index = shape.getInlets.indexOf(inlet)
if (index == -1) {
sys.error(s"No inlet ${inlet} in shape, existing ${shape.getInlets}")
}
enteringSlots.get(index) match {
case Some(inputSlot: InputSlot[_]) ⇒
inputSlot.asInstanceOf[InputSlot[In]]
case Some(_) ⇒
sys.error(s"Slot ${index} is not the input")
case None ⇒
sys.error(s"No input slot ${index}")
}
}

def getOutputSlot[Out](outlet: Outlet[Out]) = {
val index = shape.getOutlets.indexOf(outlet)
if (index == -1) {
sys.error(s"No outlet ${outlet} in shape, existing ${shape.getOutlets}")
}
outputSlots.get(index) match {
case Some(outoutSlot) ⇒
outputSlots(index).asInstanceOf[OutputSlot[Out]]
case None ⇒
sys.error(s"No output slot ${index}")
}
}

def removeEnteringSlot(enteringSlotId: Int) = {
for (timerSlot ← enteringSlots.get(enteringSlotId)) {
for (outSlotId ← timerSlot.getLinks()) {
for (outSlot ← getOutputSlot(outSlotId)) {
outSlot.unlink(timerSlot.id)
}
}
enteringSlots -= enteringSlotId
}
}
}

object SlotsCollector {
trait LogicInterface {
def setHandler(in: Inlet[_], handler: InHandler): Unit
def isAvailable[T](in: Inlet[T]): Boolean
def hasBeenPulled[T](in: Inlet[T]): Boolean
def tryPull[T](in: Inlet[T]): Unit
def grab[T](in: Inlet[T]): T
def cancel[T](in: Inlet[T]): Unit
def setHandler(out: Outlet[_], handler: OutHandler): Unit
def isAvailable[T](out: Outlet[T]): Boolean
def push[T](out: Outlet[T], elem: T): Unit
def completeStage(): Unit
}
}

/**
* Represents the collection of input, output and timer slots.
* Timer slots may be added and removed in progress.
*/
private[linkedlogic] class TimerSlotsCollector(shape: Shape, logic: TimerLogicInterface)(implicit system: ActorSystem) extends SlotsCollector(shape, logic) {
private[linkedlogic] def addOnceScheduledTimerSlot(delay: FiniteDuration): TimerSlot = {
var timerSlotId = enteringSlotIdSequence; enteringSlotIdSequence += 1
val timerSlot = new TimerSlot(timerSlotId, () ⇒ logic.scheduleOnce(timerSlotId, delay), () ⇒ logic.cancelTimer(timerSlotId))
enteringSlots += (timerSlot.id -> timerSlot)
timerSlot
}

def addPeriodicallyTimerSlot(interval: FiniteDuration, initialDelay: Option[FiniteDuration] = None): TimerSlot = {
var timerSlotId = enteringSlotIdSequence; enteringSlotIdSequence += 1
val timerSlot = new TimerSlot(timerSlotId, () ⇒ initialDelay match {
case Some(initialDelay) ⇒ logic.schedulePeriodicallyWithInitialDelay(timerSlotId, initialDelay, interval)
case None ⇒ logic.schedulePeriodically(timerSlotId, interval)
}, () ⇒ logic.cancelTimer(timerSlotId))
enteringSlots += (timerSlot.id -> timerSlot)
timerSlot
}

def onTimer(timerKey: Any): Unit = {
val timerSlotId = timerKey.asInstanceOf[Int]
enteringSlots.get(timerSlotId) match {
case Some(timerSlot: TimerSlot) ⇒
timerSlot.process()
case Some(_) ⇒
sys.error(s"Slot ${timerSlotId} is not the timer")
case None ⇒
}
}
}

object TimerSlotsCollector {
trait TimerLogicInterface extends LogicInterface {
def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit
def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit
def schedulePeriodicallyWithInitialDelay(timerKey: Any, initialDelay: FiniteDuration, interval: FiniteDuration): Unit
def cancelTimer(timerKey: Any): Unit
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.stage.linkedlogic.impl.slots

/**
* Base class of logic slot for an inlet or a timer.
*/
private[linkedlogic] abstract class EnteringSlot(val id: Int) {
private var links = Set.empty[Int]
private var availableLinks = Set.empty[Int]
private var onTerminateHandler = Option.empty[() ⇒ Unit]

protected def isPending(): Boolean
protected def process(): Unit
protected def requestNext(): Unit

def getLinks() = links

def setOnTerminateHandler(onTerminateHandler: () ⇒ Unit): Unit = {
this.onTerminateHandler = Some(onTerminateHandler)
}

def link(outputSlotId: Int): Unit = {
if (links.contains(outputSlotId)) {
sys.error(s"Link to the ${outputSlotId} already exists")
}
links += outputSlotId
}

def unlink(outputSlotId: Int): Unit = {
links -= outputSlotId
availableLinks -= outputSlotId
handleIfReadyToProcess()
}

def notifyAvailable(outputSlotId: Int, available: Boolean): Unit = {
if (available) {
availableLinks += outputSlotId
handleIfReadyToProcess()
} else {
availableLinks -= outputSlotId
}
}

def isReadyToProcess(): Boolean = {
links.size == availableLinks.size
}

def close(): Unit = {
onTerminateHandler.foreach(_.apply())
}

private def handleIfReadyToProcess(): Unit = {
if (isReadyToProcess()) {
if (isPending()) {
process()
} else {
requestNext()
}
}
}
}
Loading