Tuesday, 7 August 2018

Type level programming for Streams

Programming at this level is often considered an obscure art with little practical value, but it need not be so.
Conrad Parker (about Type-Level programming in Haskell)
 
There are a lot of domains where events are coming in stream-like way. The best one example are services based on http protocol.

The languages like Java with imperative programming legacy background are providing a sort of abstraction over stream nature of Http services - where stream is converted to branch-like structure and each leaf is handling in-variants for possible inputs. It's became popular to design systems in RMI way.

Regular handler for HTTP or RPC service is looking like:

class Controller {
def method1(params ...): Response = ...
def methodN(params ...): Response = ...
}
It means that all the magic with routing HTTP requests to methods are provided by the frameworks.

Of course there is a hidden complexity behind this - like Filter, Handlers etc.

While Reactive and Functional Programming gain some popularity, streaming DSL's gain popularity in solving Flow handling complexity.

/*
* In ~> Validate ~> SaveToStorage ~> generateEvent ~> sendEvent ~> Sink
*/
/*
* +------+
* In1 ~>| |~> Out1
* | bidi |
* Out2 <~| |<~ In2
* +------+
*/
/* construct protocol stack
* +------------------------------------+
* | stack |
* | |
* | +-------+ +---------+ |
* ~> O~~o | ~> | o~~O ~>
* Message | | codec | ByteString | framing | | ByteString
* <~ O~~o | <~ | o~~O <~
* | +-------+ +---------+ |
* +------------------------------------+
*/
view raw streams.scala hosted with ❤ by GitHub
Those flows are well self explaining and easier to maintain/change in future.

The most of primitive domain problems can be easy solved via stream of events from Input to to Sink, like:
  In ~> Step1 ~> Step2 ~> StepN ~> Sink. 

Even if your DSL isn't sexy enough to look like math diagram, you can use some tools to visualise flow in nice ways (for example: travesty)

Of course there is always extra complexity like filtering, conditioning, throttling, backpressure etc. Stream like data handling is looking natural and mostly befit the functional programming, where every flow step is a function that accept In type and returns Out type:

  type Step1 = In1 => Out1

Piping the step functions is looking natural solution. Lest assume concrete input types: ShipmentsRequestTrackRequestPricingRequest and Ping:

// Input variants
case class ShipmentsRequest(param: InParam1)
case class TrackRequest(param: InParam2)
case class PricingRequest(param: InParam3)
case object Ping
// output variants
case class ShipmentsResponse(body: OutParam1)
case class TrackResponse(body: OutParam1)
case class PricingResponse(body: OutParam1)
case object Pong
The simplest way to handle those types is operate with Any Type:

def stepProcess(in: Any): Any = {
in match {
case shipment: ShipmentsRequest => ShipmentsResponse.generate()
case track : TrackRequest => TrackResponse.generate()
case pricing : PricingRequest => PricingResponse.generate()
case Ping => Pong
}
}
view raw gistfile1.scala hosted with ❤ by GitHub
Using Any makes possible to pipe (compose) the functions. Unfortunately compiler doesn't help us with any extra checks to make piping safe.
We can use best practices and operated with the marker trait:

// Input variants
sealed trait InMessage
case class ShipmentsRequest(param: InParam1) extends InMessage
case class TrackRequest(param: InParam2) extends InMessage
case class PricingRequest(param: InParam3) extends InMessage
case object Ping extends InMessage
// output variants
sealed trait OutMessage
case class ShipmentsResponse(body: OutParam1) extends OutMessage
case class TrackResponse(body: OutParam1) extends OutMessage
case class PricingResponse(body: OutParam1) extends OutMessage
case object Pong extends OutMessage
def stepProcess(in: InMessage): OutMessage = {
in match {
case shipment: ShipmentsRequest => ShipmentsResponse.generate()
case track : TrackRequest => TrackResponse.generate()
case pricing : PricingRequest => PricingResponse.generate()
case Ping => Pong
}
view raw way2.scala hosted with ❤ by GitHub

Now compiler helps us to find if we have forgotten to handle Ping message:

def stepProcess(in: InMessage): OutMessage = {
in match {
case shipment: ShipmentsRequest => ShipmentsResponse.generate()
case track: TrackRequest => TrackResponse.generate()
case pricing: PricingRequest => PricingResponse.generate()
// case Ping => Pong
// match may not be exhaustive.
// [warn] It would fail on the following input: Ping
// [warn] in match {
// [warn] ^
}
It's still just a compiler warning - and it's not always possible to escalate it to the error - if you are not allowed to change compiler arguments. It's not always the case that all incoming message could extend some base trait.

There is other way to present input types - via union type. For example Haskell support declarations like this:

data Bool = True | False
view raw bool.hs hosted with ❤ by GitHub

In Haskell it's evolved into Data Kinds language extension. Scala is planning to support this on language level as well in Dotty compiler as Union Types.
There is a fork of Scala - Typelevel Scala that aims to bring it early than Dotty. We will use Shapeless library for type level programming solution.
In Scala we can define our input type as a chain of embedded Either types:

type InType = Either[ShipmentsRequest, Either[TrackRequest, Either[PricingRequest, Either[Ping.type, CNil]]]]
// match the PricingRequest
inMessage match {
case Right(Right(Left(value))) => value
}

Shapeless library implements some syntax sugar for that solution, here is the same code expressed via the Shapeless:

import shapeless.{:+:, CNil, Poly1}
type In1 = ShipmentsRequest :+: TrackRequest :+: PricingRequest :+: Ping.type :+: CNil
view raw In1Type.scala hosted with ❤ by GitHub
Than Step 1 can be defined as a Poly1 function determined for all unions (coproducts) types from In1:

object Step1Poly extends Poly1 {
implicit def caseShipments = at[ShipmentsRequest](shipments => ShipmentsResponse.generate())
implicit def caseTrack = at[TrackRequest](track => TrackResponse.generate())
implicit def casePricing = at[PricingRequest](pricing => PricingResponse.generate())
implicit def casePing = at[Ping.type](_ => Pong)
}
def step1(in1: In1): ShipmentsResponse :+: TrackResponse :+: PricingResponse :+: Pong.type :+: CNil =
in1.map(Step1Poly)
view raw step1.scala hosted with ❤ by GitHub

As a result type of that step is ShipmentsResponse or TrackResponse or PricingResponse or Pong. The handling for all the cases is checked during the compilation time.

The full example for akka streams can look like:

type In1 = ShipmentsRequest :+: TrackRequest :+: PricingRequest :+: Ping.type :+: CNil
object Step1Poly extends Poly1 {
implicit def caseShipments = at[ShipmentsRequest](shipments => ShipmentsResponse.generate())
implicit def caseTrack = at[TrackRequest](track => TrackResponse.generate())
implicit def casePricing = at[PricingRequest](pricing => PricingResponse.generate())
implicit def casePing = at[Ping.type](_ => Pong)
}
object StepForeach extends Poly1 {
implicit def caseShipments = at[ShipmentsResponse](shipments => ())
implicit def caseTrack = at[TrackResponse](track => ())
implicit def casePricing = at[PricingResponse](pricing => ())
implicit def casePing = at[Pong.type](_ => ())
}
Source
.empty[In1]
.map(_.map(Step1Poly))
.to(Sink.foreach(_.map(StepForeach).unify))

No comments:

Post a Comment