Skip to content

Commit

Permalink
Merge pull request #36 from DataFloz/connector-refactor
Browse files Browse the repository at this point in the history
make abstract connector in pipeline
  • Loading branch information
ofirelarat authored Jul 19, 2023
2 parents b88ae57 + 0d428b6 commit f447175
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 47 deletions.
12 changes: 0 additions & 12 deletions pipeline/config.py

This file was deleted.

21 changes: 21 additions & 0 deletions pipeline/connector/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from bl.transformer_runner_interface import TransforerRunnerInterface
from connector.producer import AbstractProducer

class AbstractConsumer:
'''Class responsible of consumning and creating kafka consumer.'''
def __init__(self, trasformer: TransforerRunnerInterface, producer: AbstractProducer,
configuration: dict):
pass

def consume(self):
'''Function start the consuming
Args:
topic: string array of the topic to consume'''

def shutdown(self):
'''Function shutdown the consumer if needed'''

def get_message_value_as_dict(self, msg):
'''Function that convert raw message to dictionary for easy consume and running the logic for abstract connector
Args:
msg: the message recieve in the consumer '''
20 changes: 20 additions & 0 deletions pipeline/connector/consumer_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import ast
import os
from connector.consumer import AbstractConsumer
from connector.kafka.kafka_consumer import KafkaConsumer
from connector.producer import AbstractProducer
from bl.transformer_runner_interface import TransforerRunnerInterface

def build_consumer(transformer: TransforerRunnerInterface, producer: AbstractProducer) \
-> AbstractConsumer:
'''Function build the consumer object
return:
AbstractConsumer for the relant environment connector'''
consumer_type = os.environ.get('type')
if consumer_type == 'kafka':
kafka_config = {
'bootstrap.servers': os.environ.get('brokers'),
'group.id': os.environ.get('group_id'),
'topics': [ast.literal_eval(os.environ.get('input'))['topic']]
}
return KafkaConsumer(transformer, producer, kafka_config)
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
import ast
import config as cfg
from confluent_kafka import Consumer, KafkaError, KafkaException
from utils.logger import logger
from bl.transformer_runner_interface import TransforerRunnerInterface
from kafka.producer import KafkaProducer
from connector.producer import AbstractProducer
from connector.consumer import AbstractConsumer

class KafkaConsumer:
class KafkaConsumer(AbstractConsumer):
'''Class responsible of consumning and creating kafka consumer.'''
def __init__(self, trasformer: TransforerRunnerInterface, producer: KafkaProducer):
logger.info(f"connect to: {cfg.kafka_config['bootstrap.servers']}")
def __init__(self, trasformer: TransforerRunnerInterface, producer: AbstractProducer,
configuration: dict):
logger.info(f"connect to: {configuration['bootstrap.servers']}")
conf = {
'bootstrap.servers': cfg.kafka_config['bootstrap.servers'],
'group.id': cfg.kafka_config['group.id'],
'bootstrap.servers': configuration['bootstrap.servers'],
'group.id': configuration['group.id'],
'auto.offset.reset': 'latest'
}

self.topics = configuration['topics']

self.consumer = Consumer(conf)
self.transformer = trasformer
self.producer = producer
self.running = False

def consume(self, topics):
def consume(self):
'''Function start the consuming
Args:
topic: string array of the topic to consume'''
logger.info("start consume topics: %s", topics)
logger.info("start consume topics: %s", self.topics)
self.running = True
try:
self.consumer.subscribe(topics)
self.consumer.subscribe(self.topics)

while self.running:
msg = self.consumer.poll(timeout=1.0)
Expand Down Expand Up @@ -61,4 +64,4 @@ def get_message_value_as_dict(self, msg):
encode_msg_value = msg_value.decode('utf-8')
message_data = ast.literal_eval(encode_msg_value)

return message_data
return message_data
Original file line number Diff line number Diff line change
@@ -1,28 +1,16 @@
import json
from utils.logger import logger
from confluent_kafka import Producer
import config as cfg
from connector.producer import AbstractProducer

class KafkaProducer():
class KafkaProducer(AbstractProducer):
'''Class responsible of producing and creating kafka prooducer.'''
def __init__(self, topic):
def __init__(self, configuration: dict):
conf = {
'bootstrap.servers': cfg.kafka_config['bootstrap.servers']
'bootstrap.servers': configuration['bootstrap.servers']
}

self.producer = Producer(conf)
self.topic = topic


def produced_callback(self, err, msg):
'''Callback after produce
Args:
err: if the produce failed
msg: the msg that produced'''
if err is not None:
logger.error("Failed to deliver message: %s:%s", str(msg), str(err))
else:
logger.error("Message produced: %s", str(msg))
self.topic = configuration['topic']

def produce(self, values):
'''Function produce msg
Expand All @@ -33,5 +21,5 @@ def produce(self, values):
self.producer.produce(self.topic, key=None, value=json.dumps(value, default=str),
callback=self.produced_callback)
else:
self.producer.produce(self.topic, key=None, value=json.dumps(values, default=str),
self.producer.produce(self.topic, key=None, value=json.dumps(values, default=str),
callback=self.produced_callback)
22 changes: 22 additions & 0 deletions pipeline/connector/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from utils.logger import logger

class AbstractProducer():
'''Class responsible of producing and creating prooducer.'''
def __init__(self, configuration: dict):
pass


def produced_callback(self, err, msg):
'''Callback after produce
Args:
err: if the produce failed
msg: the msg that produced'''
if err is not None:
logger.error("Failed to deliver message: %s:%s", str(msg), str(err))
else:
logger.error("Message produced: %s", str(msg))

def produce(self, values):
'''Function produce msg
Args:
values: the values that will be produce'''
18 changes: 18 additions & 0 deletions pipeline/connector/producer_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import ast
import os
from connector.kafka.kafka_producer import KafkaProducer
from connector.producer import AbstractProducer

def build_producer() -> AbstractProducer:
'''Function build the producer object
return:
AbstractProducer for the relant environment connector'''

producer_type = os.environ.get('type')
if producer_type == 'kafka':
kafka_config = {
'bootstrap.servers': os.environ.get('brokers'),
'topic': ast.literal_eval(os.environ.get('output'))['topic']
}

return KafkaProducer(kafka_config)
11 changes: 5 additions & 6 deletions pipeline/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from utils.logger import logger
from config import kafka_input_topic, kafka_output_topic
from kafka.consumer import KafkaConsumer
from kafka.producer import KafkaProducer
from connector.consumer_builder import build_consumer
from connector.producer_builder import build_producer
from bl.transformation_builder import build_transformation

def main():
''' The main function is the pipeline entrypoint. '''
logger.info("starting pipeline")
transformer = build_transformation()
producer = KafkaProducer(kafka_output_topic)
consumer = KafkaConsumer(transformer, producer)
consumer.consume([kafka_input_topic])
producer = build_producer()
consumer = build_consumer(transformer, producer)
consumer.consume()

main()

0 comments on commit f447175

Please sign in to comment.