Programming at this level is often considered an obscure art with little practical value, but it need not be so.
Conrad Parker (about Type-Level programming in Haskell)
There are a lot of domains where events are coming in stream-like way. The best one example are services based on http protocol.
The languages like Java with imperative programming legacy background are providing a sort of abstraction over stream nature of Http services - where stream is converted to branch-like structure and each leaf is handling in-variants for possible inputs. It's became popular to design systems in RMI way.
Regular handler for HTTP or RPC service is looking like:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Controller { | |
def method1(params ...): Response = ... | |
def methodN(params ...): Response = ... | |
} |
Of course there is a hidden complexity behind this - like Filter, Handlers etc.
While Reactive and Functional Programming gain some popularity, streaming DSL's gain popularity in solving Flow handling complexity.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* In ~> Validate ~> SaveToStorage ~> generateEvent ~> sendEvent ~> Sink | |
*/ | |
/* | |
* +------+ | |
* In1 ~>| |~> Out1 | |
* | bidi | | |
* Out2 <~| |<~ In2 | |
* +------+ | |
*/ | |
/* construct protocol stack | |
* +------------------------------------+ | |
* | stack | | |
* | | | |
* | +-------+ +---------+ | | |
* ~> O~~o | ~> | o~~O ~> | |
* Message | | codec | ByteString | framing | | ByteString | |
* <~ O~~o | <~ | o~~O <~ | |
* | +-------+ +---------+ | | |
* +------------------------------------+ | |
*/ |
The most of primitive domain problems can be easy solved via stream of events from Input to to Sink, like:
In ~> Step1 ~> Step2 ~> StepN ~> Sink.
Even if your DSL isn't sexy enough to look like math diagram, you can use some tools to visualise flow in nice ways (for example: travesty)
Of course there is always extra complexity like filtering, conditioning, throttling, backpressure etc. Stream like data handling is looking natural and mostly befit the functional programming, where every flow step is a function that accept In type and returns Out type:
type Step1 = In1 => Out1
Piping the step functions is looking natural solution. Lest assume concrete input types: ShipmentsRequest, TrackRequest, PricingRequest and Ping:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Input variants | |
case class ShipmentsRequest(param: InParam1) | |
case class TrackRequest(param: InParam2) | |
case class PricingRequest(param: InParam3) | |
case object Ping | |
// output variants | |
case class ShipmentsResponse(body: OutParam1) | |
case class TrackResponse(body: OutParam1) | |
case class PricingResponse(body: OutParam1) | |
case object Pong |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def stepProcess(in: Any): Any = { | |
in match { | |
case shipment: ShipmentsRequest => ShipmentsResponse.generate() | |
case track : TrackRequest => TrackResponse.generate() | |
case pricing : PricingRequest => PricingResponse.generate() | |
case Ping => Pong | |
} | |
} |
We can use best practices and operated with the marker trait:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Input variants | |
sealed trait InMessage | |
case class ShipmentsRequest(param: InParam1) extends InMessage | |
case class TrackRequest(param: InParam2) extends InMessage | |
case class PricingRequest(param: InParam3) extends InMessage | |
case object Ping extends InMessage | |
// output variants | |
sealed trait OutMessage | |
case class ShipmentsResponse(body: OutParam1) extends OutMessage | |
case class TrackResponse(body: OutParam1) extends OutMessage | |
case class PricingResponse(body: OutParam1) extends OutMessage | |
case object Pong extends OutMessage | |
def stepProcess(in: InMessage): OutMessage = { | |
in match { | |
case shipment: ShipmentsRequest => ShipmentsResponse.generate() | |
case track : TrackRequest => TrackResponse.generate() | |
case pricing : PricingRequest => PricingResponse.generate() | |
case Ping => Pong | |
} |
Now compiler helps us to find if we have forgotten to handle Ping message:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def stepProcess(in: InMessage): OutMessage = { | |
in match { | |
case shipment: ShipmentsRequest => ShipmentsResponse.generate() | |
case track: TrackRequest => TrackResponse.generate() | |
case pricing: PricingRequest => PricingResponse.generate() | |
// case Ping => Pong | |
// match may not be exhaustive. | |
// [warn] It would fail on the following input: Ping | |
// [warn] in match { | |
// [warn] ^ | |
} |
There is other way to present input types - via union type. For example Haskell support declarations like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data Bool = True | False |
In Haskell it's evolved into Data Kinds language extension. Scala is planning to support this on language level as well in Dotty compiler as Union Types.
There is a fork of Scala - Typelevel Scala that aims to bring it early than Dotty. We will use Shapeless library for type level programming solution.
In Scala we can define our input type as a chain of embedded Either types:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type InType = Either[ShipmentsRequest, Either[TrackRequest, Either[PricingRequest, Either[Ping.type, CNil]]]] | |
// match the PricingRequest | |
inMessage match { | |
case Right(Right(Left(value))) => value | |
} |
Shapeless library implements some syntax sugar for that solution, here is the same code expressed via the Shapeless:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shapeless.{:+:, CNil, Poly1} | |
type In1 = ShipmentsRequest :+: TrackRequest :+: PricingRequest :+: Ping.type :+: CNil |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Step1Poly extends Poly1 { | |
implicit def caseShipments = at[ShipmentsRequest](shipments => ShipmentsResponse.generate()) | |
implicit def caseTrack = at[TrackRequest](track => TrackResponse.generate()) | |
implicit def casePricing = at[PricingRequest](pricing => PricingResponse.generate()) | |
implicit def casePing = at[Ping.type](_ => Pong) | |
} | |
def step1(in1: In1): ShipmentsResponse :+: TrackResponse :+: PricingResponse :+: Pong.type :+: CNil = | |
in1.map(Step1Poly) |
As a result type of that step is ShipmentsResponse or TrackResponse or PricingResponse or Pong. The handling for all the cases is checked during the compilation time.
The full example for akka streams can look like:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type In1 = ShipmentsRequest :+: TrackRequest :+: PricingRequest :+: Ping.type :+: CNil | |
object Step1Poly extends Poly1 { | |
implicit def caseShipments = at[ShipmentsRequest](shipments => ShipmentsResponse.generate()) | |
implicit def caseTrack = at[TrackRequest](track => TrackResponse.generate()) | |
implicit def casePricing = at[PricingRequest](pricing => PricingResponse.generate()) | |
implicit def casePing = at[Ping.type](_ => Pong) | |
} | |
object StepForeach extends Poly1 { | |
implicit def caseShipments = at[ShipmentsResponse](shipments => ()) | |
implicit def caseTrack = at[TrackResponse](track => ()) | |
implicit def casePricing = at[PricingResponse](pricing => ()) | |
implicit def casePing = at[Pong.type](_ => ()) | |
} | |
Source | |
.empty[In1] | |
.map(_.map(Step1Poly)) | |
.to(Sink.foreach(_.map(StepForeach).unify)) |
No comments:
Post a Comment