Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving PiEventHandlers #35

Merged
merged 5 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 98 additions & 99 deletions src/com/workflowfm/pew/execution/AkkaExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,179 +10,178 @@ import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success}

import java.util.UUID

class AkkaExecutor (
store:PiInstanceStore[Int],
store:PiInstanceStore[UUID],
processes:PiProcessStore
)(
implicit val system: ActorSystem,
override implicit val executionContext: ExecutionContext = ExecutionContext.global,
implicit val timeout:FiniteDuration = 10.seconds
) extends SimulatorExecutor[Int] with PiObservable[Int] {
) extends SimulatorExecutor[UUID] with PiObservable[UUID] {

def this(store:PiInstanceStore[Int], l:PiProcess*)
def this(store:PiInstanceStore[UUID], l:PiProcess*)
(implicit system: ActorSystem, context: ExecutionContext, timeout:FiniteDuration) =
this(store,SimpleProcessStore(l :_*))

def this(system: ActorSystem, context: ExecutionContext, timeout:FiniteDuration,l:PiProcess*) =
this(SimpleInstanceStore[Int](),SimpleProcessStore(l :_*))(system,context,timeout)
this(SimpleInstanceStore[UUID](),SimpleProcessStore(l :_*))(system,context,timeout)

def this(l:PiProcess*)(implicit system: ActorSystem) =
this(SimpleInstanceStore[Int](),SimpleProcessStore(l :_*))(system,ExecutionContext.global,10.seconds)
this(SimpleInstanceStore[UUID](),SimpleProcessStore(l :_*))(system,ExecutionContext.global,10.seconds)


val execActor = system.actorOf(AkkaExecutor.execprops(store,processes))
implicit val tOut = Timeout(timeout)
implicit val tOut = Timeout(timeout)

override def simulationReady = Await.result(execActor ? AkkaExecutor.SimReady,timeout).asInstanceOf[Boolean]

override protected def init(process:PiProcess,args:Seq[PiObject]):Future[Int] =
execActor ? AkkaExecutor.Init(process,args) map (_.asInstanceOf[Int])
override protected def start(id:Int) = execActor ! AkkaExecutor.Start(id)
override protected def init(process:PiProcess,args:Seq[PiObject]):Future[UUID] =
execActor ? AkkaExecutor.Init(process,args) map (_.asInstanceOf[UUID])

override protected def start(id:UUID) = execActor ! AkkaExecutor.Start(id)

override def subscribe(handler:PiEventHandler[Int]):Future[PiSwitch] = (execActor ? AkkaExecutor.Subscribe(handler)).mapTo[PiSwitch]
override def subscribe(handler:PiEventHandler[UUID]):Future[PiSwitch] = (execActor ? AkkaExecutor.Subscribe(handler)).mapTo[PiSwitch]

}

object AkkaExecutor {
case class Init(p:PiProcess,args:Seq[PiObject])
case class Start(id:Int)
case class Result(id:Int,ref:Int,res:MetadataAtomicProcess.MetadataAtomicResult)
case class Error(id:Int,ref:Int,ex:Throwable)
case class Start(id:UUID)
case class Result(id:UUID,ref:Int,res:MetadataAtomicProcess.MetadataAtomicResult)
case class Error(id:UUID,ref:Int,ex:Throwable)

case class ACall(id:Int,ref:Int,p:MetadataAtomicProcess,args:Seq[PiObject],actor:ActorRef)
case class ACall(id:UUID,ref:Int,p:MetadataAtomicProcess,args:Seq[PiObject],actor:ActorRef)
case object AckCall

case class AFuture(f:Future[Any])

case object Ping
case object SimReady

case class Subscribe(handler:PiEventHandler[Int])
case class Subscribe(handler:PiEventHandler[UUID])

def atomicprops(implicit context: ExecutionContext = ExecutionContext.global): Props = Props(new AkkaAtomicProcessExecutor())
def execprops(store:PiInstanceStore[Int], processes:PiProcessStore)(implicit system: ActorSystem, exc: ExecutionContext): Props = Props(new AkkaExecActor(store,processes))
def execprops(store:PiInstanceStore[UUID], processes:PiProcessStore)(implicit system: ActorSystem, exc: ExecutionContext): Props = Props(new AkkaExecActor(store,processes))
}

class AkkaExecActor(
var store:PiInstanceStore[Int],
var store:PiInstanceStore[UUID],
processes:PiProcessStore
)(
override implicit val system: ActorSystem,
implicit val executionContext: ExecutionContext = ExecutionContext.global
) extends Actor with PiStream[Int] {

var ctr:Int = 0
) extends Actor with PiStream[UUID] {

def init(p:PiProcess,args:Seq[PiObject]):Int = {
val inst = PiInstance(ctr,p,args:_*)
store = store.put(inst)
ctr = ctr + 1
ctr-1
def init(p:PiProcess,args:Seq[PiObject]):UUID = {
val id = java.util.UUID.randomUUID
val inst = PiInstance(id,p,args:_*)
store = store.put(inst)
id
}

def start(id:Int):Unit = store.get(id) match {
def start(id:UUID):Unit = store.get(id) match {
case None => publish(PiFailureNoSuchInstance(id))
case Some(inst) => {
publish(PiEventStart(inst))
val ni = inst.reduce
if (ni.completed) ni.result match {
case None => {
publish(PiFailureNoResult(ni))
store = store.del(id)
}
case Some(res) => {
publish(PiEventResult(ni, res))
store = store.del(id)
}
case None => {
publish(PiFailureNoResult(ni))
store = store.del(id)
}
case Some(res) => {
publish(PiEventResult(ni, res))
store = store.del(id)
}
} else {
val (toCall,resi) = ni.handleThreads(handleThread(ni))
val futureCalls = toCall flatMap (resi.piFutureOf)
//System.err.println("*** [" + ctr + "] Updating state after init")
store = store.put(resi)
(toCall zip futureCalls) map runThread(resi)
val (toCall,resi) = ni.handleThreads(handleThread(ni))
val futureCalls = toCall flatMap (resi.piFutureOf)
//System.err.println("*** [" + ctr + "] Updating state after init")
store = store.put(resi)
(toCall zip futureCalls) map runThread(resi)
}
}
}

final def postResult(id:Int,ref:Int, res:MetadataAtomicProcess.MetadataAtomicResult):Unit = {
final def postResult(id:UUID, ref:Int, res:MetadataAtomicProcess.MetadataAtomicResult):Unit = {
publish(PiEventReturn(id,ref,PiObject.get(res._1),res._2))
store.get(id) match {
case None => publish(PiFailureNoSuchInstance(id))
case Some(i) =>
case Some(i) =>
if (i.id != id) System.err.println("*** [" + id + "] Different instance ID encountered: " + i.id) // This should never happen. We trust the Instance Store!
else {
//System.err.println("*** [" + id + "] Running!")
val ni = i.postResult(ref, res._1).reduce
if (ni.completed) ni.result match {
case None => {
publish(PiFailureNoResult(ni))
store = store.del(ni.id)
}
case Some(res) => {
publish(PiEventResult(ni, res))
store = store.del(ni.id)
}
} else {
val (toCall,resi) = ni.handleThreads(handleThread(ni))
val futureCalls = toCall flatMap (resi.piFutureOf)
//System.err.println("*** [" + i.id + "] Updating state after: " + ref)
store = store.put(resi)
(toCall zip futureCalls) map runThread(resi)
}
if (ni.completed) ni.result match {
case None => {
publish(PiFailureNoResult(ni))
store = store.del(ni.id)
}
case Some(res) => {
publish(PiEventResult(ni, res))
store = store.del(ni.id)
}
} else {
val (toCall,resi) = ni.handleThreads(handleThread(ni))
val futureCalls = toCall flatMap (resi.piFutureOf)
//System.err.println("*** [" + i.id + "] Updating state after: " + ref)
store = store.put(resi)
(toCall zip futureCalls) map runThread(resi)
}
}
}
}
}

def handleThread(i:PiInstance[Int])(ref:Int,f:PiFuture):Boolean = {
def handleThread(i:PiInstance[UUID])(ref:Int,f:PiFuture):Boolean = {
//System.err.println("*** [" + id + "] Checking thread: " + ref + " (" + f.fun + ")")
f match {
case PiFuture(name, outChan, args) => i.getProc(name) match {
case None => {
publish(PiFailureUnknownProcess(i, name))
false
}
case Some(p:MetadataAtomicProcess) => true
case Some(p:CompositeProcess) => { // TODO this should never happen!
publish(PiFailureAtomicProcessIsComposite(i, name))
false
case PiFuture(name, outChan, args) => i.getProc(name) match {
case None => {
publish(PiFailureUnknownProcess(i, name))
false
}
case Some(p:MetadataAtomicProcess) => true
case Some(p:CompositeProcess) => { // TODO this should never happen!
publish(PiFailureAtomicProcessIsComposite(i, name))
false
}
}
}
} }
//
} }
//

def runThread(i:PiInstance[Int])(t:(Int,PiFuture)):Unit = {
def runThread(i:PiInstance[UUID])(t:(Int,PiFuture)):Unit = {
//System.err.println("*** [" + id + "] Running thread: " + t._1 + " (" + t._2.fun + ")")
t match {
case (ref,PiFuture(name, outChan, args)) => i.getProc(name) match {
case None => {
// This should never happen! We already checked!
System.err.println("*** [" + i.id + "] ERROR *** Unable to find process: " + name + " even though we checked already")
}
case Some(p:MetadataAtomicProcess) => {
implicit val tOut = Timeout(1.second)
val objs = args map (_.obj)
try {
publish(PiEventCall(i.id,ref,p,objs))
// TODO Change from ! to ? to require an acknowledgement
system.actorOf(AkkaExecutor.atomicprops()) ! AkkaExecutor.ACall(i.id,ref,p,objs,self)
} catch {
case _:Throwable => Unit //TODO specify timeout exception here! - also print a warning
case (ref,PiFuture(name, outChan, args)) => i.getProc(name) match {
case None => {
// This should never happen! We already checked!
System.err.println("*** [" + i.id + "] ERROR *** Unable to find process: " + name + " even though we checked already")
}
case Some(p:MetadataAtomicProcess) => {
implicit val tOut = Timeout(1.second)
val objs = args map (_.obj)
try {
publish(PiEventCall(i.id,ref,p,objs))
// TODO Change from ! to ? to require an acknowledgement
system.actorOf(AkkaExecutor.atomicprops()) ! AkkaExecutor.ACall(i.id,ref,p,objs,self)
} catch {
case _:Throwable => Unit //TODO specify timeout exception here! - also print a warning
}
}
case Some(p:CompositeProcess) => {// This should never happen! We already checked!
publish(PiFailureAtomicProcessIsComposite(i, name))
}
}
case Some(p:CompositeProcess) => {// This should never happen! We already checked!
publish(PiFailureAtomicProcessIsComposite(i, name))
}
}
} }

} }

def simulationReady():Boolean = store.simulationReady

def receive = {
case AkkaExecutor.Init(p,args) => sender() ! init(p,args)
case AkkaExecutor.Start(id) => start(id)
case AkkaExecutor.Result(id,ref,res) => postResult(id,ref,res)
case AkkaExecutor.Result(id,ref,res) => postResult(id,ref,res)
case AkkaExecutor.Error(id,ref,ex) => {
publish( PiFailureAtomicProcessException(id,ref,ex) )
store = store.del(id)
Expand All @@ -197,12 +196,12 @@ class AkkaExecActor(
}

class AkkaAtomicProcessExecutor(implicit val exc: ExecutionContext = ExecutionContext.global) extends Actor { //(executor:ActorRef,p:PiProcess,args:Seq[PiObject])
def receive = {
def receive = {
case AkkaExecutor.ACall(id,ref,p,args,actor) => {
//System.err.println("*** [" + id + "] Calling atomic process: " + p.name + " ref:" + ref)
p.runMeta(args).onComplete{
case Success(res) => actor ! AkkaExecutor.Result(id,ref,res)
case Failure(ex) => actor ! AkkaExecutor.Error(id,ref,ex)
p.runMeta(args).onComplete{
case Success(res) => actor ! AkkaExecutor.Result(id,ref,res)
case Failure(ex) => actor ! AkkaExecutor.Error(id,ref,ex)
}
actor ! AkkaExecutor.AckCall
}
Expand Down
4 changes: 2 additions & 2 deletions src/com/workflowfm/pew/execution/ProcessExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ trait ProcessExecutor[KeyT] { this:PiObservable[KeyT] =>
* @return A Future with the result of the executed process
*/
def execute(process:PiProcess,args:Seq[Any]):Future[Any] =
call(process,args,new ResultHandlerFactory[KeyT]({ id => s"[$id]"})) flatMap (_.future)
call(process,args,new ResultHandlerFactory[KeyT]) flatMap (_.future)
}

object ProcessExecutor {
Expand All @@ -110,7 +110,7 @@ trait SimulatorExecutor[KeyT] extends ProcessExecutor[KeyT] { this:PiObservable[
* @return A Future with the result of the executed process
*/
def simulate(process:PiProcess,args:Seq[Any],timeout:FiniteDuration=10.seconds):Future[Any] = {
val f = call(process,args,new ResultHandlerFactory[KeyT]({ id => s"[$id]"}))
val f = call(process,args,new ResultHandlerFactory[KeyT])
val handler = Await.result(f, timeout)
handler.future
}
Expand Down
2 changes: 1 addition & 1 deletion src/com/workflowfm/pew/metrics/Measure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class MetricsAggregator[KeyT] {
* @param name a unique name to identify the instance of [[com.workflowfm.pew.stream.PiEventHandler]]
* @param timeFn the [[PiMetadata]] key to retrieve timing information (the recorder system time by default)
*/
class MetricsHandler[KeyT](override val name: String, timeFn: PiMetadata.Key[Long] = PiMetadata.SystemTime )
class MetricsHandler[KeyT](timeFn: PiMetadata.Key[Long] = PiMetadata.SystemTime )
extends MetricsAggregator[KeyT] with PiEventHandler[KeyT] {

/** Converts [[PiEvent]]s to metrics updates. */
Expand Down
13 changes: 8 additions & 5 deletions src/com/workflowfm/pew/stream/PiEventHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ import java.text.SimpleDateFormat
import scala.collection.immutable.Queue

// Return true if the handler is done and needs to be unsubscribed.

/** A listener for [[PiEvent]]s. */
trait PiEventHandler[KeyT] extends (PiEvent[KeyT]=>Boolean) {
def name:String
/** Compose with another handler. */
def and(h:PiEventHandler[KeyT]) = MultiPiEventHandler(this,h)
}

/** A factory that generates a handler that is related to a particular workflow ID.
* The ID is generated by the [[com.workflowfm.pew.execution.ProcessExecutor]] so we do not know it in advance.
*/
trait PiEventHandlerFactory[T,H <: PiEventHandler[T]] {
def build(id:T):H
}

class PrintEventHandler[T](override val name:String) extends PiEventHandler[T] {
/** Example of a [[PiEventHandler]] that simply prints a string representation of the event to `System.err`. */
class PrintEventHandler[T] extends PiEventHandler[T] {
val formatter = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss.SSS")
override def apply(e:PiEvent[T]) = {
val time = formatter.format(e.rawTime)
Expand All @@ -26,9 +30,8 @@ class PrintEventHandler[T](override val name:String) extends PiEventHandler[T] {
}
}


/** A [[PiEventHandler]] consisting of a queue of multiple handlers. */
case class MultiPiEventHandler[T](handlers:Queue[PiEventHandler[T]]) extends PiEventHandler[T] {
override def name = handlers map (_.name) mkString(",")
override def apply(e:PiEvent[T]) = handlers map (_(e)) forall (_ == true)
override def and(h:PiEventHandler[T]) = MultiPiEventHandler(handlers :+ h)
}
Expand Down
Loading