Monday, 18 March 2013

Event-sourced architecture, travelling Java -> Scala (Part 3: Continuations)

No threads - the Akka's way

Erlang OTP as any great solution can't hold 'monopoly' position forever. Akka has taken the best from the Erlang OTP and other frameworks or languages. One of the most important questions for the threadless design - can we avoid blocking of the native threads under JVM? The answer is - yes we can. Let's look closer to TypeSafe platform. Akka come with two 'colours' for Actors:
  • Thread based - self describing name: Actors are binded to Threads, via Threads execution pool, sometimes with pool variations, but the main idea is always the same - Actors is wrapper around the native thread - it helps to manage the locking, synchronisation and etc, but the engine is based on thread itself.
  • Event based - threadless implementation, coming to bring behaviour from Erlang practices.

Event based actors

Honestly - how can we implement Erlang processes on JVM that doesn't support it? The answer is: without patching JVM - it's non trivial task. In case of patching JVM we will windup our solution from enterprise market.
As I mentioned in previous post when we are going to use event-based actors we should respect two facts:
  • threadless style - we can't use a thread features anymore - like sleep or wait - otherwise we will impact others
  • We have to work only via message exchange between our executors or callback functions (I will show the way later how to simplify this approach).
Here is an example which implements the simplest workaround for non breaking approach on JVM without patching JVM itself.
Imagine the service (Actor), that delegates (for us it is Mediator) String operations to another one actor - StringBuilderService.


That gist is representing the main parts of an example - 2 actors - cooperating with each other. Lets strict to special case - when there is only one Thread for the both Actors. We definitely must introduce some switching mechanism between the Actors to allow them the sharing of the same Thread for their execution.
If it were Erlang - we were able to get features of VM of stack management, save Thread Stack and go to another waiter. While Stack is isolated (isn't sharing the memory with others)  it's easy to implement saving its area before switching. If Language supports this behaviour - it is called language with the first-class continuations.
JVM doesn't support explicit stack management operations.
Let's review Sequence Diagram for the case when we had no any first-class continuations (before Scala 2.8). It's really easy to reuse it in Java:
Throw in case of returning the value
Figure is little bit complicated, I will describe it in details. There is one Thread and two Actors. Both are  assigned to given thread. Because of that fact we should find the way, how Actors can release the thread when the execution is done and return the stack execution to precious state.
Here we are starting when Thread is asigned to ClientActor:

    1. ClientActor executing it's body inside Thread1
    2. ClientActor is sending Part("Starting") Message to StringBuilderActor mailbox.
    -- ClientActor explicitly release the thread, by calling the Dispatcher, giving possibility to Message receiver dispatch the Message.
    3. Thread is assigned to StringBuilderActor, while it has message in mailbox
    3.1 Message matched to Actor's case, removed from mailbox
    3.2 StringBuilderActor is saving the partially applied function as it state, for now it's receive method will accept String as argument to the partially applied function.
    -- StringBuilderActor throws special defined exception. This a mandatory alternative to blocking the thread (Thread.wait) because there is no other way how to return the stack to it's previous state. Thread is solus in our system, and we simulated it's bloking in the first actor - via calling (implicitly) another one. Exception will windup the stack to the nearest catch block for the specially defined exception. Of course it's inside the first Actor. Despite exception is thrown, the state of StringBuilderActor has been committed. Exception is the only the way how we are reverting the stack back. If we do as usually wait(), we would block the execution of the flow at all, while the Thread is belonging to both Actors.
    4. The same as 1
    5. ClientActor is sending Part("Done") Message to StringBuilderActor mailbox.
    -- predictable release of the Thread, giving the control to Message Dispatcher, find the Actor whom cam dispatch our message.
    6. the same as 3
    6.1 is equal to 3.1 Step
    6.2 finally we can apply the lost one argument to partially applied function (it was produced by previous message matching, state is stored via partial application of the arguments)
    -- StringBuilderActor finished with execution and throwing again the exception to rollback stack into the previous position.
    7 - the Flow is moving, Thread is assigned to whatever Actor, requires the execution.

You won't believe, but we have simulated continuations here. That way is called simulate stack inspection with exception handlers. As we can find the main idea is been able to save the execution stack in the case of suspending and then reverting it back in case of continuation.
I wish to avoid description's complexity of the first-class continuations. The outcome from the example is - continuation consists from the two bricks:

  • continuation blocks
  • rollback function. 

I will quote my favourite description for the nature of continuations:
Say you're in the kitchen in front of the refrigerator, thinking about a
sandwitch. You take a continuation right there and stick it in your
pocket. Then you get some turkey and bread out of the refrigerator and
make yourself a sandwitch, which is now sitting on the counter. You
invoke the continuation in your pocket, and you find yourself standing
in front of the refrigerator again, thinking about a sandwitch. But
fortunately, there's a sandwitch on the counter, and all the materials
used to make it are gone. So you eat it. :-)
What is the possible Scala's way?

CPS in Scala

First-class continuations in Scala were implemented via Continuation-passing style. You should read the Wikipedia article, that is well defining topic. Be aware that "Responder and continuation plugin" is have to be explicitly enabled in SBT and probably in your IDE. As you may found there are different type of continuations.

Why the CPS style?- 
Continuation passing style can be used to implement continuations and control flow operators in a functional language that does not feature first-class continuations but does have first-class functions and tail-call optimization
These were the conditions Scala developers have faced. Functional Programming bricks are standing for the features which are already complex enough ;-) Scala doesn't introduce new keywords, but introduces two new functions: shift and reset.

Both are coming from the Danvy and Filinski’s model:
there are two primitive operations, shift and reset. With shift, one can access the current
continuation and with reset, one can demarcate the boundary up
to which continuations reach: A shift will capture the control
context up to, but not including, the nearest dynamically enclosing
reset (Biernacki et al. 2006; Shan 2007)

CPS style via shift-rest in is like inverse of GOTO construction. The innermost functions of normal programming style are outmost, and vice versa.
Lets drive deeper with help of examples:
Result: won't compile, shift must be always inbound into reset stack. This is the Scala's feature to be able to catch such issues during compilation.
Result: continuation isn't invoked, No result. But you can see that continuations hosted as closures in Scala.
Result: 7 is printed, we just returning the value of param itself.
Result: 8 is printed. Actually the closure generated by shift inside the reset is the same as plusOne contains. Rephrasing explicitly:
((cont: Int => Int) => cont(7))((in: Int) => in + 1)
Well isn't easy to understand even with explanation, little bit non clean syntax. Despite the continuation is based on functions as first-class citizens, its hard to read the new construction. Believe me- it just a question of experience, it's not harder to use than GOTO. Bringing light ray into corners:
     1. Compiler has been changed (via plugin), for now the code's behaviour is special one inside reset closure.
     2. We got new termin: "slice of the continuation frame". This is everything that comes inside the reset closure, in out case it's: {shiftFunction + 1}. Precisely frame is everything inside reset except the shift closure itself.
     3. Slice of the continuation frame is taken by compiler as first-class citizen function and turned into argument to shiftFuction.
     4. It should blew your mind, but this behaviour is called - "inversion of control". IoC, something that was familiar for you in the past (if your were the Java force fun), isn' it? 

Adding complexity to the examples:
This one shows that Scala validates the presents of shift inside the stack of reset, not just inside the closure. At the end shift will apply _ + 1 closure twice and result will come as a parameter to _ * 2 closure.
Result: prints 0, because we returning this value, despite the some computations have been done before.
Result: Prints acdcdcd-bbb
Reworked example from official documentation, shows how we can run continuation in different thread.
The more examples like that - the worse understanding you should compile. I have a bad feeling about that. You can find that my examples are even simplified versions from the official Scala site. For now you should get the feeling - all of that doesn't describe where is treadless hided, how shift-reset would help us with async operations, and finally - why shift-reset?
It might is the same emotion when you drived into Monads. Scala developers are ignoring the needs of community to be trained. Official proposal document is freaky, no jokes, even if you have PhD in mathematical science, you won't find more practical sense inside, because it doesn't give any good examples of application.
I recommend you to spend 5 minutes, and review the .NET implementation documents for continuations. At the first the keywords are easy to get and remember - async and await. Examples are pushing us to real life issue - solution approach. Compare that with scaladoc's official example.
Lets return to our web application. You should remember NIO 2 api which allows to attach thread to connection, only when dispatching is required, but if binded thread is blocked by dispatcher's execution, it won't be reusable by others. Here is real life use-case, imagine SOA like application. Inside the dispatcher we must call other services. The way we usually do:
Because of the waiting for the results we are blocking our thread twice: . I'm sure you already have the bad feeling about this waisting of system resources. If we introduce the best practices from the async libraries: we would convert the calling to services into the callback function. Scala's closures and functions currying are extremely suitable here:
Not bad, but nesting is starting smelling badly, can you imagine if we have more nesting levels?
Lets do the same with help of shift-reset. To make the code simulating delays I introduced some fake service that will invoke callback function. We will use the Timer with scheduling, which simulates delay during the possible network operations. Never-mind on implementation - in real life the library will take care on threadless, for example using NIO 2:
Then we are simulating 2 service calls using our fake delayed responder:
And finally the client code reworked:
Looks better, isn't it?
The most important here is we don't break the thread. Inside the reset delimited code will be saved to continuation. The first-class continuation is possibility to freeze the execution state and ability to revert it into previous state when we will need it. You might need to mediate on the code to get the full understanding what is happening here. It's the most important gist for today.
Assignment to variable has the type T => Unit, where T is variable type, in our case it's String. Then shift function will get the String => Unit as an input argument. Inside the shift we are calling partially applied function, that awaits the String => Unit as the last argument. If the compiler can resolve and prove single argument's type inside the closure then Scala's sugar allows us to omit some boilerplate with argument passing. If there was no sugar we had to use this style:
As you can see continuations in Scala aren't so "Dark Sided" as they are looking in the official documentation. Lets summarise:

  • We used the threadless way, there is no locking. We "saved the stack" istead and released the thread instead of blocking it. This behaviour is similar to throwing an exception
  • Code is better readable, comparing to callback functions.
If you are perfectionist - you might smell that shift is repeated twice. As the last improvement we can just push it to level up:

As was declared before - CPS continuations are base on the function as first-class citizen and tail-call optimisation. The second one is needed in case of shift function is used inside the loops. Each time loop iteration call the reset function, it pushes back the stack to the upper level. Thanks to that we won't get stack overflow problem.

Summarising the post: Scala supports threadless and non-blocking behaviour with help of first-class citizen continuations. It is implemented as Polymorphic Delimited Continuations by a Type-Directed Selective CPS-Transform. With help of compiler plugin we can use 2 functions: shift and reset. This is not exactly the way Erlang does. But in both cases we have to design application in functional style, keeping in mind all the limitations and requirements from the async way. Exact list you should find in the library you will use to be abstracted from the problem.

In the first post I mentioned Akka.io as great solution for event-sourced applications. There should be no magic inside and if something is declared as non-blocking we will find CPS behind.
For example according to typesafe blog post we can concat the two Future's without blocking the thread:

And explanation how it's done:
This uses Continuation Passing Style with Delimited Continuations under the hood, to be able to write code that looks like it's blocking but it reality it isn't. 
Lets navigate to Future scaladoc, and attention on apply method. Or look into FutureCPSLoop class. Future from Akka can be used as an abstraction layer in your application. Even with help of such helpers it's hard to design applications in non blocking way. The cost of concurrent solutions is still hight - in short run distances (startups) I recommend to invest into scaled hosting rather to true threadless development. But still suing the libraries which will to optimise the code in possible future. For example 4000 Threads per CPU with low activeness can get up to 70% of the CPU loading. If using Akka's non blocking style - the CPU loading can be reduced to 1 - 5%. The more you business is growing the more nodes you will setup. But any scale out solution  adds complexity in support, for example colossal clusterf**k: happens when node is loaded by 100% and starts failing with time-out limits. Balancer marks the node as "dead" and moves loading to others nodes, that blows other nodes to start failing, while the last one is "dying".
Even in cloud's environment it's error prone way of automatic balancing management  definitely isn't the default way - you can get from cloud providers. May complexity of scaling out should be accepted as an explicit mark for threadless refactoring needs.

In the next post review how to convert the Domain Model into immutable style.

No comments:

Post a Comment