Freak is primarily composed of two components, Engine and Flow, which are further divided into smaller components. Engine is encapsulation of logic on how to read flows and execute them in order of specified definition. Engine does this using components:
- Butler
- Executor
- Inspector
Before explaining about these components, it is a good idea to understand what exactly is a flow in a Freak and how it is implemented. A flow is nothing but a set of steps intended to achieve an objective. This objective can be anything that is identifiable as business logic. The implementation of flow is done by defining three components:
Flow decorator, it is used to identify steps of the flow and enforce policies on how it is used or executed.
Locator, it uses flow decorator to locate steps defined in a module (module here is used as reference to single python file). Every flow is required to define one of its own, so that
can use it to identify and execute the flow.
Since, we have a basic understanding of what exactly a flow in freak is, let's move on to engine components.
Butler is responsible for reading python modules, locating steps and organizing them using locator specified by flow.
Executor is core component of the engine. It is responsible for executing steps. Currently, it only supports linear and choice-based flows.
Inspector is used to return input schema for every step defined by the flow. This is intended to be part of view logic of the engine.
- Linear Flows
- Choice Flows
Freak is currently under active development. Following code should give you an idea how it implements data flows.
This is how you define a flow using base flow.
from freak.flows.base_flow import base_flow
from freak.models.input import InputModel, InputModelB
from freak.models.request import RequestContext
from freak.models.response import Response, SuccessResponseContext
from freak.types import Flow
def func_one(ctx: RequestContext) -> SuccessResponseContext:
a = ctx.input["a"]
b = ctx.input["b"]
return SuccessResponseContext(
input=ctx.input, output={"a": a + 1, "b": b + 2}
def func_two(ctx: RequestContext) -> SuccessResponseContext:
a = ctx.input["a"]
b = ctx.input["b"]
return SuccessResponseContext(
output={"a": a + 2, "b": b + 3},
def func_three(ctx: RequestContext) -> SuccessResponseContext:
a = ctx.input["a"]
b = ctx.input["b"]
return SuccessResponseContext(
output={"a": a + 3, "b": b + 4},
def func_four(ctx: RequestContext) -> Response:
a = ctx.input["a"]
b = ctx.input["b"]
c = ctx.input["c"]
return SuccessResponseContext(
output={"a": a + 4, "b": b + 5, "c": c + 6},
Following test case will use above defintion to execute the flow.
from freak.provider import EngineProvider
def test_base_flow_prosecutioner():
engine = EngineProvider(flow_name="base_flow").engine
executioner = engine(module_name=__name__, decorator_name="base_flow")
response = executioner.execute(data={"a": 4, "b": 7}, from_step="func_one")
output, path_traversed = response
assert path_traversed == {
"last_step": "func_three",
"traversed": {
"func_one": ["func_two"],
"func_two": ["func_three"],
"func_three": ["func_four"],
responses = output.responses
assert output.from_step == 1
assert output.to_step == 4
assert output.last_successful_step == 3
assert responses[0].output == {"a": 5, "b": 9}
assert responses[1].output == {"a": 6, "b": 10}
assert responses[2].output == {"a": 7, "b": 11}
assert responses[3].success == False
assert responses[3].output == {}
assert (
== "Variable: c | Type: value_error.missing | Message: field required"
response = executioner.execute(
data={"a": 4, "b": 7, "c": 5},
output, path_traversed = response
responses = output.responses
assert len(responses) == 1
assert responses[0].output == {"a": 8, "b": 12, "c": 11}
assert output.last_successful_step == 4
assert output.from_step == 4
assert output.to_step == 4
assert path_traversed == {
"last_step": "func_four",
"traversed": {
"func_one": ["func_two"],
"func_two": ["func_three"],
"func_three": ["func_four"],
"func_four": [],
Using above code, it is also possible to generate input schema for every step. Following test case will demonstrate this behaviour.
from freak.provider import EngineProvider
def test_base_flow_fetch_schema():
engine = EngineProvider(flow_name="base_flow").engine
executioner = engine(module_name=__name__, decorator_name="base_flow")
responses = executioner.inspect()
input_model_b_schema = {
"title": "InputModelB",
"description": "Class for defining structure of request data.",
"type": "object",
"properties": {
"a": {"title": "A", "type": "integer"},
"b": {"title": "B", "type": "integer"},
"c": {"title": "C", "type": "integer"},
"required": ["a", "b", "c"],
input_model_schema = {
"title": "InputModel",
"description": "Class for defining structure of request data.",
"type": "object",
"properties": {
"a": {"title": "A", "type": "integer"},
"b": {"title": "B", "type": "integer"},
"required": ["a", "b"],
assert input_model_schema == InputModel.schema()
assert input_model_b_schema == InputModelB.schema()
assert executioner.flow.predecessor == responses["graph"]
schema_info = responses["schema"]
assert schema_info["func_one"]["schema"] == input_model_schema
assert schema_info["func_two"]["schema"] == input_model_schema
assert schema_info["func_three"]["schema"] == input_model_schema
assert schema_info["func_four"]["schema"] == input_model_b_schema
This project is licensed under the terms of the MIT
license. See LICENSE for more details.
author = {transhapHigsn},
title = {Freak is a data flow engine.},
year = {2020},
publisher = {GitHub},
journal = {GitHub repository},
howpublished = {\url{}}
This project was generated with python-package-template