Monday, May 18, 2020

Akka - Evolution of Multithreading

Over the past decade with rise of mobile applications, distributed micro-services and cloud-infrastructure services, the applications need to be highly responsive, provide high throughput and efficiently utilize system resources. Many real-time systems ranging from financial applications to games cannot wait for single-threaded process to complete. Also resource-intensive systems would take a lot of time to finish tasks without parallelization. Multithreading takes advantage of the underlying hardware by running multiple threads making the application more responsive and efficient.

Multithreading achieved through context switching, gives rise to many problems such as thread management, accessing shared resources, race conditions and deadlocks. Some of the multithreading concepts were developed to resolve these problems. Thread pool allowed to decouple task submission and execution, and alleviate from manual creation of threads. It consists of homogeneous worker threads (as pool size) that are assigned to execute tasks and returned back to the pool once task is finished. A synchronization technique of locks is used to limit access to a resource by multiple threads. Mutex on other hand is used to guard shared data, only allowing a single thread to access a resource. Such locking seriously limits concurrency with blocked caller threads not performing any work while the CPU does the heavy-lifting of suspending and restoring them later. The ExecutorService was introduced as part of Concurrency API in Java, which provide higher abstraction on threads. It managed thread creation and maintained thread pool under the hood, and is capable of running multiple asynchronous tasks. The Callable threads were added to allow to return results back to the caller within the Future object. These and many other improvements help to simplify the concurrency code but is not enough to avoid the complex thread synchronization.

Concurrency means that multiple tasks run in overlapping time periods, while Parallelism means the tasks are split up into smaller subtasks to be processed in parallel. Concurrent tasks are stateful, often with complex interactions, while parallelizable tasks are stateless, do not interact, and are composable. A concurrency model specifies how threads in the the system collaborate to complete the tasks they are are given. The important aspect of a concurrency model is whether the threads are designed to share a common state or each thread has its own own state isolated from other threads. When threads share state by using and accessing the shared object, problems like race conditions and deadlocks may occur. On the other hand when threads have separate state, they need to communicate either by exchanging immutable objects among them, or by sending copies of objects (or data) among them. This allows for no two threads to write to the same object thus avoiding the concurrency problems faced in shared state. The Parallel worker model, which is most common, has a delegator distributes the incoming jobs to different workers. Each worker completes the entire job, working in parallel and running in different threads. The parallel worker model works well for isolated jobs but becomes complex when workers need to access shared data. Event driven model (Reactive model) is were workers perform the partial job and delegate the remaining job to another worker. Each worker is running in its own thread, and shares no state with other workers. Jobs may even be forwarded to more than one worker for concurrent processing. The workers react to events occurring in the system, either received from the outside world or emitted by other workers. Since workers know that no other threads modify their data, the workers can be stateful making them faster than stateless workers. Akka is one such event driven concurrency model.



Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant applications on the JVM. Akka creates a layer between the actor and baseline system so that the actor's only job is to process the messages. All the complexity of creating and scheduling threads, receiving and dispatching messages, and handling race conditions and synchronization, is relegated to the framework to handle transparently. Akka model is a a concurrency model which avoids concurrent access to mutable state and uses asynchronous communications mechanisms to provide concurrency. Akka encourages the push model using messages and a single shared queue, rather than the pull model were each worker processes from its own queue.

The following characteristics of Akka allow you to solve difficult concurrency and scalability challenges in an intuitive way:
  • Event-driven model: Actors perform work in response to messages. Communication between Actors is asynchronous, allowing Actors to send messages and continue their own work without blocking to wait for a reply.
  • Strong isolation principles: Actors don't have any public API methods which can be invoked. Instead, its public API is defined through messages that the actor handles. This prevents any sharing of state between Actors, with the only way to observe another actor's state is by sending it a message asking for it.
  • Location transparency: The system constructs Actors from a factory and returns references to the instances. Because location doesn’t matter, Actor instances can start, stop, move, and restart to scale up and down as well as recover from unexpected failures. It enables the actors to know the origin of the messages they receive. The sender of the message may exist in the same JVM or another JVM, thus allowing Akka actors to run in a distributed environment without any special code.
  • Lightweight: Each instance consumes only a few hundred bytes, which realistically allows millions of concurrent Actors to exist in a single application.

Note: Lightbend announced the release of Akka 2.6 in Nov 2019 with a new Typed Actor API ("Akka Typed"). In the typed API, each actor needs to declare which message type it is able to handle and the type system enforces that only messages of this type can be sent to the actor. The classic Akka APIs are still supported even though it is recommended to use the new Akka Typed API for new projects. Currently only classic Akka APIs are used in this post but as the new typed APIs is adopted it would be updated. 

Actors and Actor System

An actor is a container for State, Behavior, a Mailbox, Child Actors and a Supervisor Strategy. An actor object needs to be shielded from the outside in order to benefit from the actor model hence actors are represented to the outside using actor references, which are objects that can be passed around freely and without restriction.

ActorSystem
All actors form part of an hierarchy known as the actor system. Actor system is a logical organization of actors into a hierarchical structure. It provides the infrastructure through which actors interact with one another. It is a heavyweight structure per application, which allocates n number of threads. Hence it is recommended to create one ActorSystem per application. ActorSystem manages the life cycle of the actors within the system and supervises them. The creator actor becomes the parent of the newly created child actor. The list of children is maintained within the actor's context. Akka creates two built-in guardian actors in the system before creating any other actor. The first actor created by actor system is called the root guardian. It is the parent of all actors in the system, and the last one to stop when the actor system is terminated. The second actor created by actor system is called the system guardian with system namespace. Akka or other libraries built on top of Akka may create actors in the system namespace. The user guardian is the top level actor that we can provide to start all other actors in the application. An actor created using system.actorOf() are children of the user guardian actor. Top-level user-created actors are determined by the user guardian actor as to how they will be supervised. The root guardian is the parent and supervisor of both the user guardian and system guardian.




Messaging and Mailbox
Actors communicate exclusively by exchanging messages. Every actor has an exclusive address and a mailbox through which it can receive messages from other actors. Mailbox messages are processed by the actor in consecutive order. There are multiple mailbox implementations with the default implementation being FIFO.

Akka ensures that each instance of an actor runs in its own lightweight thread, shielding it from rest of the system and that messages are processed one at a time. The messages are required to be immutable since actors can potentially access the same messages concurrently, in order to avoid race conditions and unexpected behaviors. Hence actors can be implemented without explicitly worrying about concurrency and synchronized access using locks. When a message is processed it is matched with the current behavior of the actor which is the function which defines the actions to be taken in reaction to the message at that point in time. When a message is sent to an actor which does not exist or is not already running then it goes to a special Dead Letter actor (/deadLetters).

State
An actor contains many instance variables to maintain state while processing multiple messages. Each actor is provided with the following useful information for performing its tasks via the Akka Actor API:
  • sender: an ActorRef to the sender of the message currently being processed
  • context: information and methods relating to the context within which the actor is running
  • supervisionStrategy: defines the strategy to be used for recovering from errors
  • self: the ActorRef for the actor itself
Behind the scenes Akka will run sets of actors on sets of real threads, where typically many actors share one thread, and subsequent invocations of one actor may end up being processed on different threads. Akka ensures that this implementation detail does not affect the single-threadedness of handling the actor’s state. Since the internal state is vital to an actor's operations, when the actor fails and is restarted by its supervisor, the state will be created from scratch.

Actor Lifecycle

An actor is a stateful resource that has to be explicitly started and stopped. An actor can create, or spawn, an arbitrary number of child actors, which in turn can spawn children of their own, thus forming an actor hierarchy. ActorSystem hosts the hierarchy and there can be only one root actor, an actor at the top of the hierarchy of the ActorSystem. ActorContext has the contextual information for the actor and the current message. It is used for spawning child actors and supervision, watching other actors to receive a Terminated(otherActor) events, logging and request-response interactions using ask with other actors. ActorContext is also used for accessing self ActorRef using context.getSelf().

Every Actor that is created must also explicitly be destroyed even when it's no longer referenced. The lifecycle of a child actor is tied to the parent, a child can stop itself or be stopped by parent at any time but it can never outlive its parent. In other words, whenever an actor is stopped, all of its children are recursively stopped. Actor can be stopped using the stop method of the ActorSystem. Stopping a child actor is done by calling context.stop(childRef) from the parent.

An Actor has the following life-cycle methods:
  • Actor's constructor: An actor’s constructor is called just like any other Scala class constructor, when an instance of the class is first created.
  • preStart: It is only called once directly during the initialization of the first instance, i.e. at creation of its ActorRef. In the case of restarts, preStart() is called from postRestart(), therefore if not overridden, preStart() is called on every restart.
  • postStop: It is sent just after the actor has been stopped. No messages are processed after this point. It can be used to perform any needed cleanup work. Akka guarantees postStop to run after message queuing has been disabled for the actor. All PostStop signals of the children are processed before the PostStop signal of the parent is processed.
  • preRestart: According to the Akka documentation, when an actor is restarted, the old actor is informed of the process when preRestart is called with the exception that caused the restart, and the message that triggered the exception. The message may be None if the restart was not caused by processing a message.
  • postRestart: The postRestart method of the new actor is invoked with the exception that caused the restart. In the default implementation, the preStart method is called.
If initialization needs to occur every time an actor is instantiated, then constructor initialization is used. On contrary, if initialization needs to occur only the first instance of the actor which is created, then initialization is added to preStart and postRestart is overridden to not call the preStart method. Below is the example of implementing all the actor lifecycle methods.
import akka.actor.{Actor,ActorSystem, Props}  
  
class RootActor extends Actor{  
  def receive = {  
    case msg => println("Message received: "+msg);  
    10/0;  
  }  
  override def preStart(){  
    super.preStart();  
    println("preStart method is called");  
  }  
  override def postStop(){  
    super.postStop();  
    println("postStop method is called");  
  }  
  override def preRestart(reason:Throwable, message: Option[Any]){  
    super.preRestart(reason, message);  
    println("preRestart method is called");  
    println("Reason: "+reason);  
  }  
  override def postRestart(reason:Throwable){  
    super.postRestart(reason);  
    println("postRestart is called");  
    println("Reason: "+reason);  
  }  
}  

Supervision and Monitoring

In an actor system, each actor is the supervisor of its children. When an actor creates children for delegating its sub-tasks, it will automatically supervise them. If an actor fails to handle a message, it suspends itself and all of its children and sends a message, usually in the form of an exception, to its supervisor. Once an actor terminates, i.e. fails in a way which is not handled by a restart, stops itself or is stopped by its supervisor, it will free up its resources, draining all remaining messages from its mailbox into the system's dead letter mailbox. The mailbox is then replaced within the actor reference with a system mailbox, redirecting all new messages into the drain. Actors cannot be orphaned or attached to supervisors from the outside, which might otherwise catch them unawares.

When a parent receives the failure signal from its child, depending on the nature of the failure, the parent decides from following options:
  • Resume: Parent starts the child actor, keeping its internal state.
  • Restart: Parent starts the child actor by clearing its internal state.
  • Stop: Stop the child actor permanently.
  • Escalate: Escalate the failure by failing itself and propagating the failure to its parent.
Restart of the actor is carried by creating a new instance of the underlying Actor class and replacing the failed instance with the fresh one inside the child's ActorRef. The new actor then resumes processing its mailbox, without processing the message during which the failure occurred.




The supervision strategy is typically defined by the parent actor when it spawns a child actor. The default supervisor strategy is to stop the child in case of failure. There are two types of supervision strategies to supervise any actor, namely one-for-one strategy and one-for-all strategy. The one-for-one strategy applies the supervision directive to the failed child while one-for-all strategy applies it to all its siblings. The one-for-one strategy is the default supervision strategy. Below is an example One-for-One Strategy implementation.
import akka.actor.SupervisorStrategy.{Escalate, Restart, Stop}
import akka.actor.{Actor, OneForOneStrategy}

class Supervisor extends Actor {

 override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
}

Monitoring is used to tie one actor to another so that it may react to the other actor’s termination, in contrast to supervision which reacts to failure. Since actors creation and restarts are not visible outside their supervisors, the only state change available for monitoring is the transition from alive to dead. Lifecycle monitoring is implemented using a Terminated message to be received by the monitoring actor, where the default behavior is to throw a special DeathPactException if not otherwise handled. The Terminated messages can be listened by invoking ActorContext.watch(targetActorRef). Termination messages are delivered regardless of the order of termination of the actors. Monitor enables to recreate actors or schedule it at later time as a retry mechanism, thus providing an alternative to restarting the actor by the supervisor. Below example show the parent actor monitoring the child actor for termination using watch() method.
import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.PoisonPill  
import akka.actor.Terminated
import akka.actor.SupervisorStrategy.Escalate

class DeathPactParentActor extends Actor with ActorLogging {

   override val supervisorStrategy = OneForOneStrategy() {
     case _: Exception => {
       log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
       Escalate
     }
   }
  
  def receive={
    case "start"=> {
      val child=context.actorOf(Props[DeathPactChildActor])
      context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
      child ! "stop"
    }
    // case Terminated(_) => log.error(s"$actor died"")        // Terminated message not handled
  }
}

class DeathPactChildActor extends Actor with ActorLogging {  
  def receive = {
    case "stop"=> {
      log.info ("Actor going to be terminated")
      self ! PoisonPill
    }
  }
}

Creating Actors

Akka creates Actor instances using a factory spawn methods which return reference to the actor's instance. Props is a configuration class to specify immutable options for the creation of an Actor, which contains information about a creator, routing, deploy etc. Actors are created by passing a Props configuration object into the actorOf factory method which is available on ActorSystem and ActorContext. The call to actorOf returns an instance of ActorRef which is a handle to the actor instance and the only way to interact with it. It is recommended to provide factory methods on the companion object of each Actor which helps keeping the creation of suitable Props close to the actor definition. Actors are automatically started asynchronously when created. It is also recommended to declare one actor within another as it breaks actor encapsulation.

Actors are implemented by extending the Actor base trait and implementing the receive method. 
The receive method defines a series of case statements determining which messages the Actor handles using standard Scala pattern matching. The Akka Actor receive message loop is exhaustive, with all the messages which the actor can accept is pattern matched, otherwise an unknown message is sent to akka.actor.UnhandledMessage of the ActorSystem's EventStream. The result of the receive method is a partial function object, which is stored within the actor as its "initial behavior".
import akka.actor.Actor
import akka.event.Logging
 
class StatusActor extends Actor {
  val log = Logging(context.system, this)
  def receive = {
    case "complete" => log.info("Task Completed!")
    case "pending" => log.info("Task Pending!")
    case _ => log.info("Task Status Unknown")
  }
}

Messages
Messages are mostly passed using Scala's Case class being immutable by default and easily able to integrate with pattern matching. Messages are sent to an Actor through one of the following Scala methods.
  • ! means "fire-and-forget", e.g. send a message asynchronously and return immediately. Also known as tell.
  • ? sends a message asynchronously and returns a Future representing a possible reply. Also known as ask.
Since ask has performance implications it is recommended to use tell unless really required. Message ordering is guaranteed on a per-sender basis. Below is example of sending tell message.
import akka.actor.{Props, ActorSystem}
 
object ActorDemo {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("demo-system")
    val props = Props[StatusActor]
    val statusActor = system.actorOf(props, "statusActor-1")
    statusActor ! "pending"
    statusActor ! "blocked"
    statusActor ! "complete"
    system.terminate()
  }
}

A message can be replied using sender() method which provides the ActorRef to the instance of sender Actor. To replying to a message we just need to use sender() ! ReplyMsg. If there is no sender i.e. a message which is sent without an actor or future context, the default sender is a 'dead-letter' actor reference. Messages can also be forwarded from one actor to another. In such case, address/reference of an Actor is maintained even though the message is going through a mediator. It is helpful when writing actors that work as routers, load-balancers, replicators etc.
import akka.actor.{Actor,ActorSystem, Props};  
class ParentActor extends Actor{  
  def receive = {  
    case message:String => println("Message recieved from "+sender.path.name+" massage: "+message);  
    val child = context.actorOf(Props[ChildActor],"ChildActor");  
    child ! message    // Message forwarded to child actor   
  }  
}  

class ChildActor extends Actor{  
  def receive ={  
    case message:String => println("Message recieved from "+sender.path.name+" massage: "+message);  
    println("Replying to "+sender().path.name);  
    sender()! "I got your message";  // Child Actor replying to parent actor
  }  
}  
  
object ActorExample{  
  def main(args:Array[String]){  
    val actorSystem = ActorSystem("ActorSystem");  
    val actor = actorSystem.actorOf(Props[ParentActor], "ParentActor");  
    actor ! "Hello";  
  }  
}  

ActorContext
ActorContext has the contextual information for the actor and the current message. It is used for spawning child actors and supervision, watching other actors to receive a Terminated(otherActor) events, logging and request-response interactions using ask with other actors. ActorContext is also used for accessing self ActorRef using context.getSelf().

ActorRef
ActorRef is a reference or immutable handle to an actor within the ActorSystem. It is shared between actors as it allows other actors to send messages to the referenced actor. Each actor has access to its canonical (local) reference through the ActorContext.self field. ActorRef always represents an incarnation (path and UID) not just a given path, once an actor is stopped and new actor uses the same name as ActorRef, the original ActorRef will not point to the new actor. There are special types of actor references which behave like local actor references, e.g. PromiseActorRef, DeadLetterActorRef, EmptyLocalActorRef and DeadLetterActorRef.

ActorPath
Actors are created in a strictly hierarchical fashion. Hence there exists a unique sequence of actor names given by recursively following the supervision links between child and parent down towards the root of the actor system. ActorPath is a unique path to an actor that shows the creation path up through the actor tree to the root actor. An actor path consists of an anchor, which identifies the actor system, followed by the concatenation of the path elements, from root guardian to the designated actor; the path elements are the names of the traversed actors and are separated by slashes. ActorPath can be created without creating the actor itself, unlike ActorRef which requires a corresponding actor. ActorPath from terminated actor can be reused to a new incarnation of the actor. ActorPath are similar to file path, for example, "akka://my-sys/user/service-a/worker1".

ActorSelection
ActorSelection is another way to represent an actors similar to ActorRef. It is a logical view of a section of an ActorSystem's tree of Actors, allowing for broadcasting of messages to that section. ActorSelection points to the path (or multiple paths using wildcards) and is completely oblivious to which actor's incarnation is currently occupying it.
val selection: ActorSelection = context.actorSelection("/user/a/*/c/*")

val actorRef = system.actorSelection("/user/myActorName/").resolveOne()

Stopping Actor
Actors can be stopped by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop the top level Actor. The actual termination of the actor is performed asynchronously. Some of other methods to stop the actor are PoisonPill, terminate() and gracefulStop().

In the below example the stop() method of ActorSystem passing the ActorRef is used to stop the top level Actor.
object ActorExample{  
  def main(args:Array[String]){  
    val actorSystem = ActorSystem("ActorSystem");  
    val actor = actorSystem.actorOf(Props[ActorExample], "RootActor");  
    actor ! "Hello"  
    actorSystem.stop(actor);
  }  
}
A child actor can be stopped by the parent actor using stop() method from its ActorContext and passing childActor's ActorRef as shown in the below example.
class ParentActor extends Actor{  
  def receive = {  
    case message:String => println("Message received " + message);  
    val childactor = context.actorOf(Props[ChildActor], "ChildActor");  
    context.stop(childactor);  
      
    case _ => println("Unknown message");  
  }  
}  
The terminate method stops the guardian actor, which in turn recursively stops all its child actors.

Exceptions
Exceptions can occur while an actor is processing a message. In case when an exception occurs while the actor is processing a message taken out of mailbox, the corresponding message is lost. The mailbox and the remaining messages in it remain unaffected. In order to retry the processing of the message, the exception must be caught and handled to retry processing the message. If code within an actor throws an exception, that actor is suspended and the supervision process is started. Depending on the supervisor's decision the actor is resumed, restarted (wiping out its internal state and starting from scratch) or terminated.

Scheduler
Scheduler is a trait and extends to AnyRef. It is used to handle scheduled tasks and provides the facility to schedule messages. We can schedule sending of messages and execution of tasks. It creates new instance for each ActorSystem for scheduling tasks to happen at specific time. It returns a cancellable reference to the scheduled operation which can be cancelled by calling cancel method on the reference object. We can implement Scheduler by importing akka.actor.Scheduler package.

Logging
Akka also comes built-in with a logging utility for actors, and it can access it by simply adding the ActorLogging trait. An actor by implementing the trait akka.actor.ActorLogging can become a logging member. The Logging object provides error, warning, info, or debug methods for logging messages. 
The Logging constructor's first parameter is any LoggingBus, specifically system.eventStream, while the second parameter is the source of the logging channel. Logging is performed asynchronously to ensure that logging has minimal performance impact. Log events are processed by an event handler actor that receives the log events in the same order they were emitted. By default log messages are printed to STDOUT, but it can also plug-in to a SLF4J logger or any custom logger. By default messages sent to dead letters are logged at INFO level Akka provides a additional configuration options for very low level debugging and customized logging.

When to use Akka

Akka is ideal for the concurrency models with a shared mutable state accessed by multiple threads which are required to be synchronized. It provides an elegant solution were multiple threads needs to communicate with each other asynchronously in order to provide concurrency. Akka actors use java threads internally and assign threads to only on-demand actors which has work to do and don't have any occupied threads. This prevents the application from running thousands of threads simultaneously adding overhead on most machines especially at peak loads. It enforces encapsulation of behavior and state within individual actors without resorting to locks. Akka actor passing messages between each other avoids blocking and the usage of inter-thread communication Java methods like wait(), notify() and notifyAll(). Akka handles errors more gracefully with the supervisor actor when informed of the child actor's failure, it could carry out retries by creating new child actors. Akka encourages push mindset, were entities react to signals, changing state, and send signals to each other to drive the system forward. Akka provides better performance than using native Java threads, even with Executors for standard implementation and fixed amount of threads.