Friday 21 September 2018

Down-streaming with Event Sourcing application.

How to apply side effects in Event Sourcing application with desirable/expected consistency level?
It's common and logical to put side effects as a Sink/End of Persistent stream. Usually side effects are dispatched in event-persisted callback handlers. These handlers can also spawn new one sub-streams to react on applied events.

There are different tactics to integrate down-streams to the business flow and this is serious architectural question that makes influence on all the application's design layers.

To review all the possibilities I'm going to use an example of typical event-sourcing application. Let's assume it implements some business utility.  After each event is dispatched we are sending the notification to Business Intelligence services for analyzing and reporting. Imagine the day when you find out - that Business Intelligence operations are fast enough and can produce real-time feedback which can increase the profitability of our business up to X%. We are using BI as an example and placeholder - but in common it can be any downstream service reacting to our business events. Down-stream service are the ones that consume the upstream service.
Just to rephrase it and agree on terminology - I will use BI in all the examples - but it could be any down-stream service.
Given the terminology 'upstream' and 'downstream' it may help to make an analogy with a river. If you drop a message (data) in the river it flows from upstream (initiator) to downstream (receiver).
Here is the typical event-sourcing flow from request to response including other side effects:


Figure 1. Typical event souring flow

After stream emits a command - it should:
  1. Validate input against the current state and business rules then generate mutation events.
  2. Persist an event(s)/state.
  3. Respond to event producer if needed.
  4. Execute other important side-effects
  5. Send events/messages/state to BI (down-stream)
Here is pseudo-code that implements the behavior using akka-persistence:


It's quite logical to apply all side-effects after the mutation has been already persisted (as we operates up-stream and down-stream patterns). In step 3 we send response to the client. Our actor is abstracted from delivering this message, in particular case we rely to "at most once" akka's guarantee. If client expects the different one then we should build business protocol to provide extra guarantees - for example handle timeouts etc.

Here is sequence diagram that helps us to find the flow's operations with different consistency levels:

Figure 2. Consistency per stream flow

The legend for Figure 2 is:
  • A to B is under your API interface consistency - for example it could be Web-sockets protocol with some custom retry policy. We are going to ignore this step flow in examples, ideate it has tolerable consistency and our Architecture design responsible only for the flow from B to E
  • C during persistency execution we rely on our persistence journal's consistency - for example Cassandra's.
  • C to D - After journal persisted event - the flow returns to app. "at least once".
  • E - is downstream - it can start the new sub-stream with A to D steps or rely on some other delivery tools, for example Kafka.
As we can see the weakest consistency is between C and D and it determines consistency to BI down-stream. Actually our example implements the first solution with the lowest consistency level but it has own pros as well.
Figure 3. BI side-effects with at most once guarantee

Solution #1: At-most-once, i.e. no guaranteed delivery 

If we send events to BI after the persistence step:
  • The response time from client perspective doesn't depend on down-streaming performance
  • BI delivery failures don't influence on our business flow
  • BI can miss out events (at most once delivery guarantee)
  • Low latency delivery to BI downstream
We can miss out downstream events - for example if we use Kafka producer and rely on Kafka's persistence - when producer.flush operation is failed we might lose the events.

Solution #2: Possible redundant events

Figure 4. Allow redundant events 

If the fact of receiving the events has a priority over dispatching them - it's possible to change the guarantee to "at least once" allowing the duplicates (in case of client retries) and inconsistency between (not all the events are persisted - but all are reported).
This solution provides:
  • Better down-stream but worse response latency (persist waits the flush for downstream completed).
  • At least one guarantee for downstream - but possible inconsistency to state.
  • Can be combined with solution #1 when different events have different guarantees.


Solution #3: Connector to persistence layer

Figure 5. Connector to persistence layer.

Sometimes down-streaming the data is becoming critical for the business flow and more reliable guarantees are wanted. It's possible to use the tools that allow to subscribe for persistence and convert them into stream.

There are plenty of market ready solution.  For example if Cassandra is used as persistence for your journal and BI is a Kafka's consumer - then you can look for different Kafka to Cassandra connectors.  In case of Akka is standing for event-sourcing in your design  - look into Persistence Query

The difference to solution #1 is:
  • Better delivery guarantee to BI (could be tuned to at least once or exact once)
  • Decline in latency
Solution is primely except the cases when latency of BI events is critical. 
You can try to implement custom solution - for example for Cassandra it's possible to read commit log and send the entries into Kafka. It will allow you to tune the latency between Cassandra and Kafka on the lowest possible level - but it will be always a tradeoff - the quicker you make Cassandra flush the data to journal - the slower will be your persistence - but faster stream to BI.

Solution #4 Integrate into persist step

When down-streaming the events is critical part of your flow and not been able to proceed with it means unrecoverable failure for application - then it's logical to amalgamate persist action with down-streaming.


Figure 6. Integrate into persist step


It means we either persist and send events downstream or fail. In case of akka-persistence we can use the journal that persists the events and sends them to Kafka. This combination can be met with DuplicatingJournal and StreamToKafkaJournal.

This solution is:
  • Failing the flow if either persist or send event failures (of course each of them can handle some retry policies etc.)
  • Has a delivery guarantee equaled to chosen downstreaming tool (for Kafka can have "at least once" and "exact once"
  • Response and other side effects might have worse latency than in solution #1 because of persistTime = max(journalPersistDuration, eventSendFlushDuration)
  • Doesn't implement Atomicity guarantees - but explicitly fails in case of inconsistency.

Solution #5 Custom protocol

Let's try to solve the the case when down-streaming consistency and latency are evenly critical - but we can accept eventual consistency in case of failures and quasi real-time for the ninety-nine out of a hundred.
This means we want to continue business flow execution even if down-stream is failing - but we rely on eventual consistency of down-stream - it should recover and continue from the moment it failed as soon as it has been recovered. 

Figure 7. Custom protocol

This is the most expensive solution as it requires implementation of manual delivery protocol to down-stream. Unfortunately Akka's At-Least-Once Delivery is insufficient for our case.
As an example we will use finite-state machine for BI consumer and coupling it's logic to the business flow.

Lets look in details:
  1. Command is emitted.
  2. Each command after validation is checked is it important to be an initialization marker for business flow. Examples of initialization commands are: start of transaction, user creates shopping card, online game round started etc. Initialization event should be delivered with the most desirable guarantee - but it should not make huge impact on main flow latency - because they take only small percentage from all the events.
  3. If event is initial then it has delivery guarantee at least once and business flow just awaits down-stream flush successfully completed. If the event isn't initial then it sent to down-stream in "fire and forget" mode.
  4. Events are persisted.
  5. Side-effects are applied and one of them is sending message to BI. Fire and forget mode like in solution #1.
  6. Sending the response to client.
  7. The most complicated part is BI - it has implements finite state machine that validates the messages been received. 

As an example of that solution we can implement FSM protocol for the service that can adds together integer values and support commands:
  • InitializeTransaction
  • Operand(Int)
  • GetState
Rough algorithms is:
  1. Business flow receives "InitializeTransaction" command
  2. Because we marked this command as initial - it should be sent to BI with at least once consistency.
  3. BI awaits with timeout for the next event - EventPersisted(TransactionInitialized) - in case of timeout it should explicitly fetch the state of transaction from the business flow.
  4. After event is produced by command is persisted - business flow should send the event to BI with fire and forget style (no delivery guarantee).
  5. BI after each event sets timeouts for expecting the next one or final state - it guarantees the eventual consistency between business service and BI down-stream.
  6. BI iterates all the 3 to 5 steps until it gets the final marker - transaction completed.


Solution #6 Kafka streams

Use Kafka streams. Kafka streams became self-sufficient platform for building event sourcing applications and doesn't require hybridizing with other frameworks like akka-persistence.


Summary:

If you don't have to rely on strict delivery guarantees for down-streams use a mix of Solution #1: At-most-once and Solution #2: Possible redundant events. 
If delivery latency isn't critical - but consistency is - use Solution #3: Connector to persistence layer. It should fit to the most of business cases and supported out of the box by the most of event-sourced frameworks. 
Solution #4 Integrate into persist step emulates transaction without rollback possibility and is suitable only for special business cases.
When down-streaming latency is critical Solution #5 Custom protocol or Solution #6 Kafka streams are the ways to go. Implementation and support of custom protocol is the most expensive, additionally it validates your main business flow and theoretically can provide good monitoring feedback. Using the Kafka streams has own limitations and isn't suitable for all the business domains.

Monday 17 September 2018

ScalaCache - conditional caching

Sometimes it's important to avoid caching some subset from return values. It's easy to implement it using Memoization method memoizeF. When using M[_] like Future - the Failed case won't be cached - and it's possible to convert any value to Failed with special exception marker that contains the value itself and in meantime blocking it from been cached.

Sometimes it's preferable to avoid caching some subset of possible values without deviation to failed case of higher kind wrapper. In case if this condition can be delineated by predicate and you don't want to play with implicit mode: Mode[F] you can mixing small trait to your cache:
This example is based on Caffeine and it isn't caching negative integer values:

Wednesday 5 September 2018

javax.ws.rs.container.ContainerRequestFilter detracts @Suspended

If your are still abused to use Java API for RESTful Web Services but building non blocking api with help of:

    @Suspended asyncResponse: AsyncResponse

Keep in mind that javax.ws.rs.container.ContainerRequestFilter doesn't support non blocking nature and in case it's a part of invocation chain - it will block the calling thread and make usage of @Suspended

The best workaround is to migrate into functional style code - and use loan pattern if possible.
Code can look like:
    @GET
    @Produces
("text/html")
    def handleRequest(@Suspended res: AsyncResponse): Unit = {
      authenticateBasicAsync(realm = "secure site") {
        userName =>
          res.resume(s"The user is '$userName'")
      }
    }

This code will be easier migrate in future to akka-http or other async http libraries.

Tuesday 4 September 2018

Сheat sheet: Team assessment

Here is the summary of my experience on what every team should evade or at least minimize. It's agnostic to methodology or it's absence on the project - just set of common sense advises.

It helps assist the team during on-boarding or check it periodically - for example on retrospective meetings.

Team should avoid:
  • Unclear goals
  • When the methodology for achieving the goal has not been agreed upon.
  • Absence of task priorities.
  • Inability to tell justified "No" to extra load.
  • A large number of tasks that require high concentration.
  • The prospects of the tasks aren't visible (when the ultimate goals of the task are unclean).
  • Lack of motivation.
  • Noise in the working space.
  • Insufficient coordination between the colleagues.
  • Insufficient delegation of work.
  • Insufficiently organized storage of information (and knowledge sharing).
  • Too many of meetings.