#Streaming Request/Response Pipeline
We often need to have common infrastructure functionality across different squbs services, or as organizational standards. Such infrastructure includes, but is not limited to logging, request tracing, authentication/authorization, tracking, cookie management, A/B testing, etc.
As squbs promotes separation of concerns, such logic would belong to infrastructure and not service implementation. The squbs streaming pipeline allows infrastructure to provide components installed into a service without service owner having to worry about such aspects in their service or actors.
Generally speaking, a squbs streaming pipeline is a Bidi Flow acting as a bridge in between the Akka Http layer and the squbs service. That is to say:
- All request messages sent from Akka Http to squbs service will go thru the pipeline
- Vice versa, all response messages sent from squbs service will go thru the pipeline.
In squbs-meta.conf
, you can specify the pipeline for your service:
squbs-services = [
{
class-name = org.squbs.sample.MyActor
web-context = mypath
pipeline = dummyflow
}
]
- If there are no custom pipeline for a squbs-service, just omit.
- Default pre/post flows specified via the below configuration are automatically connected to the pipeline unless
defaultPipelineOn
is set tofalse
:
squbs.pipeline.streaming.defaults {
pre-flow = defaultPreFlow
post-flow = defaultPostFlow
}
With the above configuration, the pipeline would look like:
+---------+ +---------+ +---------+ +---------+
RequestContext ~>| |~> | |~> | |~> | |
| default | | dummy | | default | | squbs |
| PreFlow | | flow | | PostFlow| | service |
RequestContext <~| |<~ | |<~ | |<~ | |
+---------+ +---------+ +---------+ +---------+
RequestContext
is basically a wrapper around HttpRequest
and HttpResponse
, which also allows carrying context information.
A bidi flow can be specified as below:
dummyflow {
type = squbs.pipelineflow
factory = org.squbs.sample.DummyBidiFlow
}
- type: to idenfity the configuration as a
squbs.pipelineflow
. - factory: the factory class to create the
BidiFlow
from.
A sample DummyBidiFlow
looks like below:
class DummyBidiFlow extends PipelineFlowFactory {
override def create(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
BidiShape.fromFlows(inbound, outbound)
})
}
}
In certain scenarios, a stage in pipeline may have a need to abort the flow and return an HttpResponse
, e.g., in case of authentication/authorization. In such scenarios, the rest of the pipeline should be skipped and the request should not reach to the squbs service. To skip the rest of the flow:
- the flow needs to be added to builder with
abortable
, e.g.,b.add(authorization abortable)
. - call
abortWith
onRequestContext
with anHttpResponse
when you need to abort.
In the below DummyAbortableBidiFlow
example, authorization
is a bidi flow with abortable
and it aborts the flow is user is not authorized:
class DummyAbortableBidiFlow extends PipelineFlowFactory {
override def create(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
val inboundOutboundB = b.add(authorization abortable)
inboundA ~> inboundOutboundB.in1
inboundOutboundB.out1 ~> inboundC
inboundOutboundB.in2 <~ outboundC
outboundA <~ inboundOutboundB.out2
BidiShape(inboundA.in, inboundC.out, outboundC.in, outboundA.out)
})
}
val authorization = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val authorization = b.add(Flow[RequestContext] map { rc =>
if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
else rc
})
val noneFlow = b.add(Flow[RequestContext]) // Do nothing
BidiShape.fromFlows(authorization, noneFlow)
})
}
Once a flow is added with abortable
, a bidi flow gets connected. This bidi flow checks the existence of HttpResponse
and bypasses or sends the request downstream. Here is how the above DummyAbortableBidiFlow
looks:
+-----------------------------------+
| +-----------+ +-----------+ | +-----------+
+-----------+ +---------+ | | | ~> | filter o~~~0 ~>| |
| | | | | | | |not aborted| | | inboundC | ~> RequestContext
RequestContext ~> | inboundA |~> | |~> 0~~o broadcast | +-----------+ | | |
| | | | | | | | +-----------+
+-----------+ | | | | | ~> +-----------+ |
| inbound | | +-----------+ | filter | |
| outbound| | | aborted | |
+-----------+ | B | | +-----------+ <~ +-----------+ | +-----------+
| | | | | | | | | |
RequestContext <~ | outboundA | <~| | <~0~~o merge | | | outboundC | <~ RequestContext
| | | | | | o~~~~~~~~~~~~~~~~~~~~0 <~| |
+-----------+ +---------+ | +-----------+ | +-----------+
+-----------------------------------+