Skip to content

Commit

Permalink
make abstract connector in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ofirelarat committed Jul 19, 2023
1 parent 6da4235 commit 5bda6e6
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 43 deletions.
12 changes: 0 additions & 12 deletions pipeline/config.py

This file was deleted.

23 changes: 23 additions & 0 deletions pipeline/connector/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from bl.transformer_runner_interface import TransforerRunnerInterface
from connector.producer import AbstractProducer

class AbstractConsumer:
'''Class responsible of consumning and creating kafka consumer.'''
def __init__(self, trasformer: TransforerRunnerInterface, producer: AbstractProducer, configuration: dict):
pass

def consume(self):
'''Function start the consuming
Args:
topic: string array of the topic to consume'''
pass

def shutdown(self):
'''Function shutdown the consumer if needed'''
pass

def get_message_value_as_dict(self, msg):
'''Function that convert raw message to dictionary for easy consume and running the logic for abstract connector
Args:
msg: the message recieve in the consumer '''
pass
18 changes: 18 additions & 0 deletions pipeline/connector/consumer_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import ast
import os
from connector.consumer import AbstractConsumer
from connector.kafka.kafka_consumer import KafkaConsumer
from connector.producer import AbstractProducer
from bl.transformer_runner_interface import TransforerRunnerInterface

def build_consumer(transformer: TransforerRunnerInterface, producer: AbstractProducer) -> AbstractConsumer:
consumer_type = os.environ.get('type')
if consumer_type == 'kafka':
kafka_config = {
'bootstrap.servers': os.environ.get('brokers'),
'group.id': os.environ.get('group_id'),
'topics': [ast.literal_eval(os.environ.get('input'))['topic']]
}
return KafkaConsumer(transformer, producer, kafka_config)
else:
return None
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
import ast
import config as cfg
from confluent_kafka import Consumer, KafkaError, KafkaException
from utils.logger import logger
from bl.transformer_runner_interface import TransforerRunnerInterface
from kafka.producer import KafkaProducer
from connector.producer import AbstractProducer
from connector.consumer import AbstractConsumer

class KafkaConsumer:
class KafkaConsumer(AbstractConsumer):
'''Class responsible of consumning and creating kafka consumer.'''
def __init__(self, trasformer: TransforerRunnerInterface, producer: KafkaProducer):
logger.info(f"connect to: {cfg.kafka_config['bootstrap.servers']}")
def __init__(self, trasformer: TransforerRunnerInterface, producer: AbstractProducer, configuration: dict):
logger.info(f"connect to: {configuration['bootstrap.servers']}")
conf = {
'bootstrap.servers': cfg.kafka_config['bootstrap.servers'],
'group.id': cfg.kafka_config['group.id'],
'bootstrap.servers': configuration['bootstrap.servers'],
'group.id': configuration['group.id'],
'auto.offset.reset': 'latest'
}

self.topics = configuration['topics']

self.consumer = Consumer(conf)
self.transformer = trasformer
self.producer = producer
self.running = False

def consume(self, topics):
def consume(self):
'''Function start the consuming
Args:
topic: string array of the topic to consume'''
logger.info("start consume topics: %s", topics)
logger.info("start consume topics: %s", self.topics)
self.running = True
try:
self.consumer.subscribe(topics)
self.consumer.subscribe(self.topics)

while self.running:
msg = self.consumer.poll(timeout=1.0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
import json
from utils.logger import logger
from confluent_kafka import Producer
from connector.producer import AbstractProducer
import config as cfg

class KafkaProducer():
class KafkaProducer(AbstractProducer):
'''Class responsible of producing and creating kafka prooducer.'''
def __init__(self, topic):
def __init__(self, configuration: dict):
conf = {
'bootstrap.servers': cfg.kafka_config['bootstrap.servers']
}

self.producer = Producer(conf)
self.topic = topic


def produced_callback(self, err, msg):
'''Callback after produce
Args:
err: if the produce failed
msg: the msg that produced'''
if err is not None:
logger.error("Failed to deliver message: %s:%s", str(msg), str(err))
else:
logger.error("Message produced: %s", str(msg))
self.topic = configuration['topic']

def produce(self, values):
'''Function produce msg
Expand Down
23 changes: 23 additions & 0 deletions pipeline/connector/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from utils.logger import logger

class AbstractProducer():
'''Class responsible of producing and creating prooducer.'''
def __init__(self, configuration: dict):
pass


def produced_callback(self, err, msg):
'''Callback after produce
Args:
err: if the produce failed
msg: the msg that produced'''
if err is not None:
logger.error("Failed to deliver message: %s:%s", str(msg), str(err))
else:
logger.error("Message produced: %s", str(msg))

def produce(self, values):
'''Function produce msg
Args:
values: the values that will be produce'''
pass
13 changes: 13 additions & 0 deletions pipeline/connector/producer_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import ast
import os
from connector.kafka.kafka_producer import KafkaProducer
from connector.producer import AbstractProducer

def build_producer() -> AbstractProducer:
producer_type = os.environ.get('type')
if producer_type == 'kafka':
kafka_output_topic = ast.literal_eval(os.environ.get('output'))['topic']

return KafkaProducer({'topic': kafka_output_topic})
else:
return None
11 changes: 5 additions & 6 deletions pipeline/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from utils.logger import logger
from config import kafka_input_topic, kafka_output_topic
from kafka.consumer import KafkaConsumer
from kafka.producer import KafkaProducer
from connector.consumer_builder import build_consumer
from connector.producer_builder import build_producer
from bl.transformation_builder import build_transformation

def main():
''' The main function is the pipeline entrypoint. '''
logger.info("starting pipeline")
transformer = build_transformation()
producer = KafkaProducer(kafka_output_topic)
consumer = KafkaConsumer(transformer, producer)
consumer.consume([kafka_input_topic])
producer = build_producer()
consumer = build_consumer(transformer, producer)
consumer.consume()

main()

0 comments on commit 5bda6e6

Please sign in to comment.