-
Notifications
You must be signed in to change notification settings - Fork 4
/
api.py
153 lines (128 loc) · 3.75 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- encoding: utf-8 -*-
# ! python3
from __future__ import annotations
import json
import logging
from typing import Final, List
from uuid import uuid4
from fastapi import FastAPI, Depends
from kafka import KafkaProducer
from redis.client import PubSub
from starlette.responses import JSONResponse
from src.config import settings
from src.kafka_.kafka_clients import kafka_producer
from src.log import configure_basic_logging
from src.models import MathModel, ResultModel
from src.redis_.redis_connect import redis_connect
configure_basic_logging()
L: Final = logging.getLogger(__name__)
app = FastAPI()
OK_POSTFIX: Final = ".OK"
ERR_POSTFIX: Final = ".NOT_OK"
def _group_subscribe(
redis_subscriber: PubSub,
channels: List[str]
) -> PubSub:
"""
Subsrcibes to the list of channels (<CID>.OK, <CID>.NOT_OK)
:param redis_subscriber: Redis PubSub
:param channels: list of channels for subscription
:return: PubSub with subscribed channels
"""
redis_subscriber.subscribe(*channels)
return redis_subscriber
def _gen_headers(
cid: str,
ok_channel: str,
err_channel: str,
) -> List[tuple[str, bytes]]:
"""
:param CID: Correlation ID
:param ok_channel: Channel for 200 responses
:param err_channel: Channel for != 200 responses
:return: List of headers tuple[str,bytes]
"""
return [(settings.CID, bytes(cid, "utf-8")),
(settings.REPLY_TO_OK_CHANNEL, bytes(ok_channel, "utf-8")),
(settings.REPLY_TO_NOT_OK_CHANNEL, bytes(err_channel, "utf-8"))]
def _channel_preparation(
cid: str
) -> tuple[str, str]:
"""
Adds postfixes to the CID
:param cid: Correlation ID
:return: Correlation ID with postfixes - for subscription
"""
ok_channel = f"{cid}{OK_POSTFIX}"
err_channel = f"{cid}{ERR_POSTFIX}"
return ok_channel, err_channel
async def send_kafka_event(
topic: str,
producer: KafkaProducer,
message: dict,
headers: List[tuple[str, bytes]],
) -> None:
"""
:param topic: Targeted kafka_ topic
:param producer: Kafka producer
:param message: payload message
:param headers: kafka_ headers with channels and CID List[tuple[str, bytes]]
:return: None
"""
producer.send(
topic,
key=None,
value=message,
headers=headers
)
producer.flush()
@app.post(
"/test/",
response_model=ResultModel,
description="Test description",
summary="Test summary",
tags=["Test"],
include_in_schema=True,
)
async def _(
*,
producer: KafkaProducer = Depends(kafka_producer),
redis_pubsub=Depends(redis_connect),
values: MathModel,
) -> JSONResponse:
"""
API Endpoint which sends Kafka events into Kafka topic, subscribes to
redis_ channel and waits for response (key-value)
:param producer: Kafka producer dependency
:param redis_pubsub: Redis PubSub client
:param values: MathModel - input data
:return: Response to the client
"""
cid: Final = uuid4().hex
ok_channel, err_channel = _channel_preparation(cid)
kafka_headers = _gen_headers(
cid,
ok_channel,
err_channel,
)
redis_subscriber = _group_subscribe(redis_pubsub, [ok_channel, err_channel])
L.info(
f"{settings.API_TO_SERVICE}, {producer}, {values.dict()}, {kafka_headers}"
)
await send_kafka_event(
settings.API_TO_SERVICE,
producer,
values.dict(),
kafka_headers
)
while True:
message = redis_subscriber.get_message()
if message:
break
L.info(message)
redis_subscriber.close()
return JSONResponse(
status_code=200, content=json.loads(
message["data"].decode("utf-8")
)
)