-
Notifications
You must be signed in to change notification settings - Fork 4
/
runner.py
51 lines (40 loc) · 1.22 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# -*- encoding: utf-8 -*-
# ! python3
"""
Inits the faust_ app and process the data
"""
from __future__ import annotations
import json
import logging
from typing import Final
import faust
from faust import Stream
from src.config import settings
from src.log import configure_basic_logging
from src.models import MathModel
from src.redis_.redis_connect import REDIS_CONNECT
from src.service import operation_add
configure_basic_logging()
L: Final = logging.getLogger(__name__)
app = faust.App("basic-math-app", broker=settings.BROKER)
math_topic = app.topic(settings.API_TO_SERVICE, value_type=bytes)
@app.agent(math_topic)
async def math_streaming(
stream: Stream
) -> None:
"""
Catches event from stream, process operations and publishes to redis_ topic
:param stream: Kafka event stream
:return: None
"""
async for payload in stream.events():
L.info(payload)
result = await operation_add(MathModel.parse_obj(payload.value))
REDIS_CONNECT.publish(
payload.headers[settings.REPLY_TO_OK_TOPIC],
json.dumps(result.dict())
)
L.info(
f"{result.dict()} sent to"
f" {payload.headers[settings.REPLY_TO_OK_TOPIC]}"
)