Running an Akka Actor forever

I work on a project that runs as a single process and does many things. I was also reading about Akka for a while and wanted to use it. This seemed to be a very good use case where I could learn and add value to the existing project.

This is my first time working with Akka and Scala so if you find my code not using recommended practices, please let me know, I would love to improve on it.

For the purposes of this post, let’s consider we have Runner (who we expect to run long and forever) and a Coach (who will look after Runner).

The relationship could be visualized as

initial hierarchy

The Runner is expected to run a Race, which looks like

import scala.concurrent.Future

case object Start

case object Stop

case object StartWithFuture

trait Race {
  def start: Future[Any]
}

class Marathon extends Race {

  import scala.concurrent.ExecutionContext.Implicits.global

  override def start: Future[Any] = Future {
    println("running long job in future")
    for (i <- 1 to 3) {
      Thread.sleep(200)
    }
    throw new RuntimeException("MarathonRunner is tired")
  }
}

The Marathon is a Race whis is supposed to run longer, but in our case we are making it fail so that we can assert when our Runner fails, the Coach starts it up again.

The way to imagine failure would be

fail hierarchy

Our Runner looks like

import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  override def receive: Receive = LoggingReceive {
    case Start =>
      sender ! "OK"
      log.debug("running...")
      Thread.sleep(10)
      throw new RuntimeException("MarathonRunner is tired")

    case Failure(throwable) => throw throwable

    case Stop =>
      log.debug("stopping runner")
      context.stop(self)
  }
}

Our Runner is an Actor which is defined to do specific task when it receives a known message. For example, in our case Runner will only work if he receives one of messages in Start, Failure and Stop

Once our Runner is ready, we need to see what our Coach does with this Runner

import akka.actor.SupervisorStrategy.Restart
import akka.actor._
import akka.event.LoggingReceive

import scala.concurrent.duration._

case object StartWork

case object RestartRunner

object Coach {
  def props(): Props = Props[Coach]
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon).withDispatcher("my-pinned-dispatcher"), "runner")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
    case _: RuntimeException =>
      sender ! Start
      Restart
  }

  override def receive = LoggingReceive {
    case StartWork => runner ! Start

    case RestartRunner =>
      log.debug("runner restarted, sending message to Run")
      self ! StartWork
  }
}

As we can see Coach is also an Actor which does specific tasks based on the messages it receives. In this case, Actor does following things different from our Runner Actor

  1. Creating Runner Actor: When Coach Actor is initialized, it executes context.actorOf, which means it creates a new Runner and supervises it

  2. Supervision: It defines supervisorStrategy for its children(Actors), in this case it supervises Runner. If Runner fails with RuntimeExecption, it Restart the Actor. It applied OneForOneStrategy which means only take action on the child that failed. As per this example, it tries to restart Actor 2 times with in 5 seconds and stops if Actor still fails.

  3. Started Actor on PinnedDispatcher: The reason we run Runner on a dedicated thread because in my current project I always wanted the legacy process to be available

Once we have this relationship defined, we should be able to run the code. We do this by running following

object RaceEvent extends App {
  val baseConfig = ConfigFactory.load()
  val system = ActorSystem.create("race", baseConfig)
  val coach = system.actorOf(Coach.props(), "coach")
  coach ! StartWork
}

When I run this on my laptop, I see the following logs

[DEBUG] [06/03/2015 22:06:15.784] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [06/03/2015 22:06:15.785] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [06/03/2015 22:06:15.789] [race-akka.actor.default-dispatcher-3] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#1505590548]
[DEBUG] [06/03/2015 22:06:15.791] [race-akka.actor.default-dispatcher-2] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@35aab39a)
[DEBUG] [06/03/2015 22:06:15.791] [race-akka.actor.default-dispatcher-4] [akka://race/user] now supervising Actor[akka://race/user/coach#-659795440]
[DEBUG] [06/03/2015 22:06:15.804] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] started (com.learner.ahka.ruforever.Coach@523d23f0)
[DEBUG] [06/03/2015 22:06:15.804] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] started (com.learner.ahka.ruforever.Runner@4eab5491)
[DEBUG] [06/03/2015 22:06:15.804] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#-1170580233]
[DEBUG] [06/03/2015 22:06:15.804] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message StartWork
[DEBUG] [06/03/2015 22:06:15.805] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] received handled message Start
[DEBUG] [06/03/2015 22:06:15.806] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received unhandled message OK
[DEBUG] [06/03/2015 22:06:15.807] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] running...
[ERROR] [06/03/2015 22:06:15.825] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
	at com.learner.ahka.ruforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:18)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
	at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at com.learner.ahka.ruforever.Runner.aroundReceive(Runner.scala:11)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

[DEBUG] [06/03/2015 22:06:15.825] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarting
[DEBUG] [06/03/2015 22:06:15.828] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarted
[DEBUG] [06/03/2015 22:06:15.828] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] received handled message Start
[DEBUG] [06/03/2015 22:06:15.828] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] running...
[DEBUG] [06/03/2015 22:06:15.828] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received unhandled message OK
[ERROR] [06/03/2015 22:06:15.840] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
	at com.learner.ahka.ruforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:18)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
	at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at com.learner.ahka.ruforever.Runner.aroundReceive(Runner.scala:11)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

[DEBUG] [06/03/2015 22:06:15.840] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarting
[DEBUG] [06/03/2015 22:06:15.840] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarted
[DEBUG] [06/03/2015 22:06:15.840] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] received handled message Start
[DEBUG] [06/03/2015 22:06:15.840] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received unhandled message OK
[DEBUG] [06/03/2015 22:06:15.840] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] running...
[ERROR] [06/03/2015 22:06:15.851] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
	at com.learner.ahka.ruforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:18)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
	at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at com.learner.ahka.ruforever.Runner.aroundReceive(Runner.scala:11)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

[INFO] [06/03/2015 22:06:15.855] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] Message [com.learner.ahka.ruforever.Start$] from Actor[akka://race/user/coach#-659795440] to Actor[akka://race/user/coach/runner#-1170580233] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [06/03/2015 22:06:15.855] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] stopped

The codebase for this is located on Github and also include tests.

comments powered by Disqus