Communicators: Actors with purely functional state
In Scala, Akka actors, as in the traditional Actor model, may
modify private state. The accepted convention is to have a mutable object (e.g. a
Map
), a var
, and mutate it like so:
class Library extends Actor {
var books = scala.collection.mutable.Map.empty[String, String]
def receive: Receive = {
case AddBook(isbn, title) => books += (isbn -> title)
}
}
object Library {
case class AddBook(isbn: String, title: String)
}
This is a bad idea. There are several reasons for this. First, Scala eschews var
s, they should
only be used when absolutely necessary (read: never). There is also an additional need for
thread-safety for the collection, not because of the receive
method itself. The receive
method
is
guaranteed to run inside a single thread. However,
an unsuspecting user might still launch a Future
and modify the collection, leading to
unpredictable behaviour. Such concurrent mutations on a var
put strain on the garbage collector,
in fact, it often necessitates the existence of a garbage collector.1 Lastly, as with any mutable
state and possible lack of referential transparency, the code can become hard to reason about.
Thankfully, Akka actors offer a possibility to do this completely functionally. The function
context.become
allows an Actor to change its receive
method on-the-fly. In other words, it lets
the Actor change its state and communication model. Here’s the above implemented using this
paradigm:
class Library extends Actor {
def receive = active(Map.empty)
def active(books: Map[String, String]): Receive = {
case AddBook(isbn, title) => {
// for immutable maps, += returns a new collection
context.become(active(books += (isbn -> title)))
}
}
}
The active
function returns a new Receive
, receiving the current actor state as its
parameter. Adding logic to it is now easy:
class Library extends Actor {
def receive = active(Map.empty)
def active(books: Map[String, String]): Receive = {
case AddBook(isbn title) => {
if (books.size < 10) {
context.become(active(books += (isbn -> title)))
} else {
sender() ! "Too many books"
}
}
}
}
The above code is now thread-safe and doesn’t use mutable collections, but what if our logic gets
more complicated? What if we need to talk to another Actor, or talk to the sender of the message?
This is where we stumble upon a design feature of Akka: all of its Actors are actually compiled
down into a callback-based implementation. There is no guarantee that a Future
launched in a
receive case will be running in the same thread as the next! One could argue that this is not a
feature but a flaw, but I won’t go that far. Hence, code dealing with Futures in Akka actors needs to deal with the unforgiving reality that there is no
guarantee of thread safety. Case in point:
class Library(popReservation: String => Future[String]) extends Actor {
def receive = active(Map.empty)
def active(books: Map[String, String]): Receive = {
case AddBook(isbn, title) => { ... } // as before
case AskForBook(isbn) => {
popReservation(isbn) foreach { i =>
// AAH!!!
context.become(active(books - i))
sender() ! s"Here you go: $i"
}
}
}
}
Why am I screaming in the comments? First, as calling map
for our Future launches a new thread, we
have no idea whether sender()
returns the same value in the new thread, and second, we may be
modifying the books collection concurrently with other threads - leaving the garbage collector to
collect our mess. So we strain the GC and risk giving the book to the wrong caller!
Since the actual execution of a Future is left to the execution context, which in the case of Actors
is the ActorSystem
s dispatcher, we may or may not be invoking sender()
in the right thread
— there is simply no guarantee. We can’t reason about it, it has been hidden from us.
To deal with this, Akka has introduced the pipe
pattern, which is an implicit given to Futures
which solves this:
class Library(popReservation: String => Future[String]) extends Actor {
def receive = active(Map.empty)
def active(books: Map[String, String]): Receive = {
case AddBook(isbn, title) => { ... } // as before
case AskForBook(isbn) => {
// launch another thread
val reservation: Future[String] = popReservation(isbn) map { i =>
s"Here you go: $i"
context.become(active(books - isbn)) // AAH!
}
// but sender() is still the same
reservation pipeTo sender
}
}
}
Another option is to fix the reference of sender
:
val s = sender()
val reservation: Future[String] = popReservation(isbn) map { i =>
s ! s"Here you go: $i"
context.become(active(books - isbn)) // AAH!
}
Ok, now we’ve fixed sender()
, but what about the books
collection? Let’s add a PopBook(isbn:
String)
case class, and handle that for removals:
class Library(popReservation: String => Future[String]) extends Actor {
def receive = active(Map.empty)
def active(books: Map[String, String]): Receive = {
case AddBook(isbn, title) => { ... } // as before
case PopBook(isbn) => context.become(active(books - isbn))
case AskForBook(isbn) => {
// launch another thread
val reservation: Future[String] = popReservation(isbn) map { i =>
s"Here you go: $i"
self ! PopBook(i)
}
// but sender() is still the same
reservation pipeTo sender
}
}
}
Sending messages to self
is always thread-safe - the reference does not change over time. So, at
this point, it seems clear that making actor code thread-sane involves the use of:
- immutable state - call
context.become
with a closure over the new actor state, - converting asynchronous state modifications as messages to be handled later, and
- making sure the
sender()
reference is consistent
What about complicated states? What if we need to react differently to these messages, e.g., when
the library is closed? I sense that you’re about to mention Akka’s FSM
construct, which
builds a state machine, encapsulating state and transitions to what is essentially syntactic sugar,
and on the surface, seems like a good idea.
Enter Akka FSMs
At a closer look, it essentially leads us to repeat the same mistakes as above, and the arguments against it are argumented here. In summary, it boils down to:
- Akka FSM’s is too restrictive. You cannot handle multi-step or complicated state transitions, and modeling undeterministic behaviour is impossible.
- You are tied to Akka completely, you must use Akka testkit for your tests. Anyone who has worked with testkit knows this to be a burden.
- State transitions have identity instead of being truly functional, that is, FSMs alter the current state instead of producing a new one.
Moreover, and I think this is the biggest shortcoming, the Akka FSM are finite-state automata
— they are characterised by the state transition function (Input, State) => State
. Since we
know actors are more about communication than anything else, this model is insufficient, and what we
need is a state machine that can produce output: a finite state transducer. Its state transition
function has the signature (Input, State) => (Output, State)
- every transition produces an
output, and Scala can model this efficiently:
trait FSA[State, Input, Output] {
def transition(s: State, i: Input): (Option[Output], State)
}
With all these flaws, despite being a nice idea at a glance, it’s obvious that for any complicated logic Akka FSM’s aren’t sufficient.
Let’s envision a radical version of actors, accounting for all the flaws described above:
- State transitions should be about producing a new state, i.e.
(Input, State) => (Output, State)
- Actor computations will deal with asynchronous code, we must deal with this intelligently
- Keep I/O logic out of actors - the actor only communicates with the external world
- Actors should only mutate their state with with
context.become
The last bullet point is especially important, as it constrains state changes
to be entirely functional, as you can simply make a function def foo(state:
State): Receive
, and keep calling it recursively, by transitioning states
thusly:
def active(state: State): Receive = {
case someInput: Input => context become active(state)
}
This idea is not new. Erlang actors have worked like this for actual decades, and arguments for using this method in Scala can be found left and right, summarized particularly well in Alexandru Nedelcu’s Scala best practices.
active(Sum) ->
receive
{From, GetValue} -> From ! Sum;
{n} -> active(Sum + n)
end.
Putting emphasis on the last point, I’ve come up with a moniker called communicators.
Actor, meet communicator
Let’s define the Communicator
trait first independently:
trait Communicator[State, Input, Output] extends Actor {
/** This is the initial actor state */
def initial: State
/** The state transition function */
def process(state: State, input: Input): Future[(Option[Output], State)]
/** The output processing function */
def handle(state: State, output: Output, origin: ActorRef): Future[Unit]
}
initial
is simply the initial state machine state, process
is the state transition function and
handle
is the function that will deal with dispatching the result of process
. Because we’re
producing content in another thread, we want to make sure the reference of sender
is fixed, and by
using this with the pipeTo
pattern, we get thread safety. Let’s extend the Actor
trait to get
receive
trait Communicator[State, Input, Output] extends Actor {
/** This is the initial actor state */
def initial: State
/** The state transition function */
def handle(state: State, product: Output, origin: ActorRef): Future[Unit]
/** The output processing function */
def process(state: State, input: Input): Future[(Option[Output], State)]
def receive = active(initial)
/** I/O handling which the deriving class must implement */
def active(newState: State): Receive
}
The active
function is the actual output-producing function. The user is left to define three
things:
- the initial actor state in
initial
- the output dispatch function
handle
- the state transition function
process
- the
active
function which handles input and output
To see this in action, first, let’s define the application states.
object Library {
// Library state
case class LibraryState(open: Boolean, books: Map[String, String])
// Input alphabet
sealed trait LibraryInput
case class SetOpen(o: Boolean) extends Input
case class AddBook(isbn: String, title: String) extends Input
case class GetBook(isbn: String) extends Input
// Output alphabet
sealed trait LibraryOutput
case object SorryWeAreClosed extends Output
case object DoNotHaveIt extends Output
case object SorryReserved extends Output
case class Book(isbn: String, title: String) extends Output
case class Reservation(isbn: String, title: String) extends Output
}
The actual state is just a case class: this gives us the nice copy
function for easy updates. Then
we use polymorphism to implement the input and output alphabets. Then we implement the actor itself:
class Library(getReservation: String => Future[Boolean])
extends Communicator[LibraryState, LibraryInput, LibraryOutput] {
import Library._
def initial = State(false, scala.collection.immutable.Map.empty)
override def active(newState: LibraryState): Receive = {
case (output: LibraryOutput, origin: ActorRef) => handle(output, origin)
case input: LibraryInput => {
val origin = sender()
process(newState, input) map {
case (output, state) =>
output foreach { o =>
self ! (o, origin)
}
self ! state
}
}
}
override def process(state: State, input: Input): Future[(Option[Output], State)] =
input match {
case SetOpen(o) => Future.successful((None, state.copy(open = o)))
case (GetBook(_) | AddBook(_, _)) if !state.open =>
Future.successful((Some(SorryWeAreClosed), state))
case GetBook(isbn) => {
val book =
for {
(isbn, title) <- state.books.get(isbn)
} yield {
getReservation(isbn) map { reserved =>
if (!reserved) {
(Some(Book(isbn, title)), state.copy(books = state.books - isbn))
} else {
(Some(SorryReserved), state)
}
}
}
book getOrElse Future.successful((Some(DoNotHaveIt), state))
}
case AddBook(isbn, title) =>
Future.successful((None, state.copy(books = state.books + (isbn -> title))))
}
override def handle(state: State, output: Output, origin: ActorRef): Future[Unit] = {
Future {
origin ! output
}
}
}
Decoupling Akka
So, now we’ve made a very thin actor, with little I/O logic inside it, but it’s still an
actor. Let’s decouple it entirely from actor semantics. First, we define a StateMachine[I, O]
trait:
trait StateMachine[I, O] {
def process(input: I): Future[(Option[O], StateMachine[I, O])]
}
And excise the state logic from the Communicator, moving it to the State
case class:
case class LibraryState(open: Boolean, books: Map[String, String], getReservation: String => Future[Boolean])(
implicit ec: ExecutionContext)
extends StateMachine[LibraryInput, LibraryOutput] {
def process(input: LibraryInput): Future[(Option[LibraryOutput], LibraryState)] = {
input match {
case SetOpen(o) => Future.successful((None, copy(open = o)))
case (GetBook(_) | AddBook(_, _)) if !open =>
Future.successful((Some(SorryWeAreClosed), copy()))
case GetBook(isbn) => {
val book =
for {
title <- books.get(isbn)
} yield {
getReservation(isbn) map { reserved =>
if (!reserved) {
(Some(Book(isbn, title)), copy(books = books - isbn))
} else {
(Some(SorryReserved), copy())
}
}
}
book getOrElse Future.successful((Some(DoNotHaveIt), copy()))
}
case AddBook(isbn, title) =>
Future.successful((None, copy(books = books + (isbn -> title))))
}
}
}
You may be wondering: wait, where’s the handle
implementation? We kept that out from the state
machine class since it’s not its responsibility - so we keep that in the Communicator:
class Library(getReservation: String => Future[Boolean])
extends Communicator[LibraryInput, LibraryOutput, LibraryState] {
import context.dispatcher
def initial = LibraryState(false, scala.collection.immutable.Map.empty, getReservation)
override def handle(output: LibraryOutput, origin: ActorRef): Unit = origin ! output
override def active(newState: LibraryState): Receive = {
case (output: LibraryOutput, origin: ActorRef) => handle(output, origin)
case state: LibraryState => context become active(state)
case input: LibraryInput => {
val origin = sender()
newState.process(input) map {
case (output, state) => {
output foreach { o =>
self ! (o, origin)
}
self ! state
}
}
}
}
}
So, all state is kept neatly in a separate entity that’s entirely unit testable in its own right
without having to rely on Akka testkit or the like – input and output dispatch and state
transitions are done in the active
method.
I know the state case class manipulation introduces more boilerplate, but as long as that
boilerplate isn’t complicated, I think this is a fair compromise. Plus, one can
use lenses to remove some of the boilerplate, e.g.,
by defining handy update functions. One could cook up something doggedly interesting using Cats and
StateT
- as long as you provide a function of the kind (I, S) => (Option[O], S)
, the sky is the limit.
Thanks to Jaakko Pallari (@jkpl) for previewing this.
-
This is actually false, as Aaron Turon, a core Rust developer, proves in his article about getting lock-free structures without garbage collection. ↩