Skip to content

Commit

Permalink
Improved event structure and introduced the PiObservable trait.
Browse files Browse the repository at this point in the history
This is work in progress - code is broken

AkkaExecutor is now complying, but there is more to do and the tests are
broken.

See issue #5 for more information
  • Loading branch information
PetrosPapapa committed Oct 8, 2018
1 parent 0baee73 commit e538f41
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 95 deletions.
128 changes: 69 additions & 59 deletions src/com/workflowfm/pew/PiEventHandler.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
package com.workflowfm.pew

import scala.concurrent.{Promise,Future}
import scala.collection.immutable.Queue

sealed trait PiEvent[KeyT] {
def id:KeyT
}

sealed trait PiEvent[KeyT]
case class PiEventStart[KeyT](i:PiInstance[KeyT]) extends PiEvent[KeyT] {
override def id = i.id
}
case class PiEventResult[KeyT](i:PiInstance[KeyT], res:Any) extends PiEvent[KeyT] {
override def id = i.id
}
case class PiEventFailure[KeyT](i:PiInstance[KeyT], reason:Throwable) extends PiEvent[KeyT] {
override def id = i.id
}
case class PiEventException[KeyT](override val id:KeyT, reason:Throwable) extends PiEvent[KeyT]
case class PiEventCall[KeyT](override val id:KeyT, ref:Int, p:AtomicProcess, args:Seq[PiObject]) extends PiEvent[KeyT]
case class PiEventReturn[KeyT](override val id:KeyT, ref:Int, result:Any) extends PiEvent[KeyT]
case class PiEventProcessException[KeyT](override val id:KeyT, ref:Int, reason:Throwable) extends PiEvent[KeyT]

//case class PiEventStart[KeyT](i:PiInstance[KeyT]) extends PiEvent[KeyT]
case class PiEventResult[KeyT](i:PiInstance[KeyT], res:Any) extends PiEvent[KeyT]
case class PiEventFailure[KeyT](i:PiInstance[KeyT], reason:Throwable) extends PiEvent[KeyT]
case class PiEventException[KeyT](i:KeyT, reason:Throwable) extends PiEvent[KeyT]
case class PiEventCall[KeyT](id:KeyT, ref:Int, p:AtomicProcess, args:Seq[PiObject]) extends PiEvent[KeyT]
case class PiEventReturn[KeyT](id:KeyT, ref:Int, result:Any) extends PiEvent[KeyT]
case class PiEventProcessException[KeyT](id:KeyT, ref:Int, reason:Throwable) extends PiEvent[KeyT]
// Return true if the handler is done and needs to be unsubscribed.

trait PiEventHandler[KeyT] extends (PiEvent[KeyT]=>Boolean) {
def name:String
def and(h:PiEventHandler[KeyT]) = MultiPiEventHandler(this,h)
}

trait PiEventHandler[KeyT,InitT] extends (PiEvent[KeyT]=>Unit){
def init(i:PiInstance[KeyT]):InitT
trait PiEventHandlerFactory[T] {
def build(id:T):PiEventHandler[T]
}

class DefaultHandler[T] extends PiEventHandler[T,Unit] {
override def init(i:PiInstance[T]):Unit = System.err.println(" === INITIAL STATE " + i.id + " === \n" + i.state + "\n === === === === === === === ===")

override def apply(e:PiEvent[T]) = e match {
class PrintEventHandler[T] extends PiEventHandler[T] {
override def apply(e:PiEvent[T]) = { e match {
case PiEventStart(i) => System.err.println(" === INITIAL STATE " + i.id + " === \n" + i.state + "\n === === === === === === === ===")
case PiEventResult(i,res) => {
System.err.println(" === FINAL STATE " + i.id + " === \n" + i.state + "\n === === === === === === === ===")
System.err.println(" === RESULT FOR " + i.id + ": " + res)
Expand All @@ -40,57 +53,54 @@ class DefaultHandler[T] extends PiEventHandler[T,Unit] {
case PiEventProcessException(id,ref,reason) => {
System.err.println(" === PROCESS FAILED: " + id + " === " + ref + " === Exception: " + reason)
reason.printStackTrace()
}
}
}
false }
}

class PromiseHandler[T](override val name:String, val id:T) extends PiEventHandler[T] {
val promise = Promise[Any]()
def future = promise.future

override def apply(e:PiEvent[T]) = if (e.id == this.id) e match {
case PiEventResult(i,res) => promise.success(res); true
case PiEventFailure(i,reason) => promise.failure(reason); true
case PiEventException(id,reason) => promise.failure(reason); true
case PiEventProcessException(id,ref,reason) => promise.failure(reason); true
case _ => false
} else false
}

class PromiseHandler[T] extends PiEventHandler[T,Future[Any]]{
val default = new DefaultHandler[T]
var promises:Map[T,Promise[Any]] = Map()
class PromiseHandlerFactory[T](name:String) extends PiEventHandlerFactory[T] {
override def build(id:T) = new PromiseHandler[T](name,id)
}

override def apply(e:PiEvent[T]) = e match {
case PiEventResult(i,res) => promises synchronized {
default(e)
promises.get(i.id) match {
case None => Unit
case Some(p) => p.success(res)
}
promises = promises - i.id
}
case PiEventFailure(i,reason) => promises synchronized {
default(e)
promises.get(i.id) match {
case None => Unit
case Some(p) => p.failure(reason)
}
}
case PiEventException(id,reason) => promises synchronized {
default(e)
promises.get(id) match {
case None => Unit
case Some(p) => p.failure(reason)
}
}
case PiEventProcessException(id,ref,reason) => promises synchronized {
default(e)
promises.get(id) match {
case None => Unit
case Some(p) => p.failure(reason)
}
}
case _ => default(e)
}

override def init(i:PiInstance[T]) = promises synchronized {
default.init(i)
val promise = Promise[Any]()
promises += (i.id -> promise)
promise.future
}
case class MultiPiEventHandler[T](handlers:Queue[PiEventHandler[T]]) extends PiEventHandler[T] {
override def apply(e:PiEvent[T]) = handlers map (_(e)) forall (_ == true)
override def and(h:PiEventHandler[T]) = MultiPiEventHandler(handlers :+ h)
}

object MultiPiEventHandler {
def apply[T](handlers:PiEventHandler[T]*):MultiPiEventHandler[T] = MultiPiEventHandler[T](Queue[PiEventHandler[T]]() ++ handlers)
}


trait PiObservable[T] {
def subscribe(handler:PiEventHandler[T]):Unit
def unsubscribe(handlerName:String):Unit
}

object PromiseHandler {
type ResultT = Future[Any]
trait SimplePiObservable[T] extends PiObservable[T] {
var handlers:Queue[PiEventHandler[T]] = Queue()

override def subscribe(handler:PiEventHandler[T]):Unit =
handlers = handlers :+ handler

override def unsubscribe(handlerName:String):Unit = {
handlers = handlers filter (_.name!=handlerName)
}

def publish(evt:PiEvent[T]) = {
handlers = handlers filterNot (_(evt))
}
}
44 changes: 36 additions & 8 deletions src/com/workflowfm/pew/execution/AkkaExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,31 @@ import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout

class AkkaPiEventHandler(handler:PiEventHandler[Int]) extends Actor {
def receive = {
case AkkaExecutor.E(event) => if (handler(event)) unsubscribe(context)
case AkkaExecutor.Unsubscribe(name:String) => if (name == handler.name) unsubscribe(context)
}

def unsubscribe(context:ActorContext) = {
context.system.eventStream.unsubscribe(self,classOf[AkkaExecutor.Event])
context.stop(self)
}
}

class AkkaExecutor(store:PiInstanceStore[Int], processes:PiProcessStore)(implicit system: ActorSystem, override implicit val context: ExecutionContext = ExecutionContext.global, implicit val timeout:FiniteDuration = 10.seconds) extends FutureExecutor {
trait AkkaPiObservable extends PiObservable[Int] {
implicit val system:ActorSystem
override def subscribe(handler:PiEventHandler[Int]):Boolean = {
val handlerActor = system.actorOf(AkkaExecutor.handlerprops(handler))
system.eventStream.subscribe(handlerActor, classOf[AkkaExecutor.Event])
}
override def unsubscribe(name:String):Unit = {
system.eventStream.publish(AkkaExecutor.Unsubscribe(name))
}
}


class AkkaExecutor(store:PiInstanceStore[Int], processes:PiProcessStore)(implicit system: ActorSystem, override implicit val context: ExecutionContext = ExecutionContext.global, implicit val timeout:FiniteDuration = 10.seconds) extends ProcessExecutor[Int] with AkkaPiObservable {
def this(store:PiInstanceStore[Int], l:PiProcess*)(implicit system: ActorSystem, context: ExecutionContext, timeout:FiniteDuration) = this(store,SimpleProcessStore(l :_*))
def this(system: ActorSystem, context: ExecutionContext, timeout:FiniteDuration,l:PiProcess*) = this(SimpleInstanceStore(),SimpleProcessStore(l :_*))(system,context,timeout)
def this(l:PiProcess*)(implicit system: ActorSystem) = this(SimpleInstanceStore(),SimpleProcessStore(l :_*))(system,ExecutionContext.global,10.seconds)
Expand All @@ -22,8 +45,8 @@ class AkkaExecutor(store:PiInstanceStore[Int], processes:PiProcessStore)(implici

override def simulationReady = Await.result(execActor ? AkkaExecutor.SimReady,timeout).asInstanceOf[Boolean]

override def execute(process:PiProcess,args:Seq[Any]):Future[Future[Any]] =
Future.successful((execActor ? AkkaExecutor.Call(process,args map PiObject.apply)))
override def call(process:PiProcess,args:Seq[PiObject]):Future[Int] =
execActor ? AkkaExecutor.Call(process,args) map (_.asInstanceOf[Int])
}

object AkkaExecutor {
Expand All @@ -39,18 +62,23 @@ object AkkaExecutor {
case object Ping
case object SimReady

sealed trait Event
case class E(evt:PiEvent[Int]) extends Event
case class Unsubscribe(name:String) extends Event

def atomicprops(implicit context: ExecutionContext = ExecutionContext.global): Props = Props(new AkkaAtomicProcessExecutor())
def execprops(store:PiInstanceStore[Int], processes:PiProcessStore)(implicit system: ActorSystem, exc: ExecutionContext): Props = Props(new AkkaExecActor(store,processes))
def handlerprops(handler:PiEventHandler[Int])(implicit context: ExecutionContext = ExecutionContext.global): Props = Props(new AkkaPiEventHandler(handler))
}

class AkkaExecActor(var store:PiInstanceStore[Int], processes:PiProcessStore)(implicit system: ActorSystem, implicit val exc: ExecutionContext = ExecutionContext.global) extends Actor {
var ctr:Int = 0
val handle = new PromiseHandler[Int]

def call(p:PiProcess,args:Seq[PiObject]) = {

def handle(evt:PiEvent[Int]) = system.eventStream.publish(AkkaExecutor.E(evt))

def call(p:PiProcess,args:Seq[PiObject]):Future[Int] = {
//System.err.println("*** [" + ctr + "] Starting call of:" + p.name)
val inst = PiInstance(ctr,p,args:_*)
val ret = handle.init(inst)
val ni = inst.reduce
if (ni.completed) ni.result match {
case None => {
Expand All @@ -68,7 +96,7 @@ class AkkaExecActor(var store:PiInstanceStore[Int], processes:PiProcessStore)(im
}
ctr = ctr + 1
//System.err.println("*** [" + (ctr-1) + "] Done init.")
ret
return Future.successful(ctr-1)
}

final def postResult(id:Int,ref:Int, res:PiObject):Unit = {
Expand Down
67 changes: 39 additions & 28 deletions src/com/workflowfm/pew/execution/ProcessExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.workflowfm.pew.execution
import com.workflowfm.pew._
import scala.concurrent._
import scala.concurrent.duration._
import scala.collection.mutable.Map

/**
* Executes an atomic process - blocking
Expand All @@ -24,17 +25,27 @@ case class AtomicProcessExecutor(process:AtomicProcess) {
/**
* Trait representing the ability to execute any PiProcess
*/
trait ProcessExecutor[R] {
def execute(process:PiProcess,args:Seq[Any]):Future[R]
def simulationReady:Boolean
}
trait ProcessExecutor[KeyT] { this:PiObservable[KeyT] =>
def call(process:PiProcess,args:Seq[PiObject]):Future[KeyT]
def simulationReady:Boolean

implicit val context: ExecutionContext //= ExecutionContext.global

trait FutureExecutor extends ProcessExecutor[PromiseHandler.ResultT] {
implicit val context: ExecutionContext = ExecutionContext.global
def execute(process:PiProcess,args:Seq[Any]):Future[PromiseHandler.ResultT]
def execute(process:PiProcess,args:Seq[Any]):Future[KeyT] = {
call(process,args map PiObject.apply)
}

def execute(process:PiProcess,args:Seq[Any],factory:PiEventHandlerFactory[KeyT]):Future[PiEventHandler[KeyT]] = {
execute(process,args) map { id =>
val handler = factory.build(id)
subscribe(handler)
handler
}
}
}

object ProcessExecutor {
def default = SingleBlockingExecutor(Map[String,PiProcess]())
//def default = SingleBlockingExecutor(Map[String,PiProcess]())

final case class AlreadyExecutingException(private val cause: Throwable = None.orNull)
extends Exception("Unable to execute more than one process at a time", cause)
Expand All @@ -51,23 +62,23 @@ object ProcessExecutor {
/**
* Shortcut methods for unit testing
*/
trait ProcessExecutorTester {
def exe(e:FutureExecutor,p:PiProcess,args:Any*) = awaitf(e.execute(p,args:Seq[Any]))
def await[A](f:Future[A]):A = try {
Await.result(f,15.seconds)
} catch {
case e:Throwable => {
System.out.println("=== RESULT FAILED! ===")
throw e
}
}

def awaitf[A](f:Future[Future[A]]):A = try {
Await.result(Await.result(f,15.seconds),15.seconds)
} catch {
case e:Throwable => {
System.out.println("=== RESULT FAILED! ===")
throw e
}
}
}
//trait ProcessExecutorTester {
// def exe(e:FutureExecutor,p:PiProcess,args:Any*) = awaitf(e.execute(p,args:Seq[Any]))
// def await[A](f:Future[A]):A = try {
// Await.result(f,15.seconds)
// } catch {
// case e:Throwable => {
// System.out.println("=== RESULT FAILED! ===")
// throw e
// }
// }
//
// def awaitf[A](f:Future[Future[A]]):A = try {
// Await.result(Await.result(f,15.seconds),15.seconds)
// } catch {
// case e:Throwable => {
// System.out.println("=== RESULT FAILED! ===")
// throw e
// }
// }
//}

0 comments on commit e538f41

Please sign in to comment.