diff --git a/pipeline/config.py b/pipeline/config.py deleted file mode 100644 index 1982ee3..0000000 --- a/pipeline/config.py +++ /dev/null @@ -1,12 +0,0 @@ -import os -import ast - - -kafka_config = { - 'bootstrap.servers': os.environ.get('brokers'), - 'group.id': os.environ.get('group_id'), -} - -kafka_input_topic = ast.literal_eval(os.environ.get('input'))['topic'] - -kafka_output_topic = ast.literal_eval(os.environ.get('output'))['topic'] diff --git a/pipeline/connector/consumer.py b/pipeline/connector/consumer.py new file mode 100644 index 0000000..0f066ae --- /dev/null +++ b/pipeline/connector/consumer.py @@ -0,0 +1,23 @@ +from bl.transformer_runner_interface import TransforerRunnerInterface +from connector.producer import AbstractProducer + +class AbstractConsumer: + '''Class responsible of consumning and creating kafka consumer.''' + def __init__(self, trasformer: TransforerRunnerInterface, producer: AbstractProducer, configuration: dict): + pass + + def consume(self): + '''Function start the consuming + Args: + topic: string array of the topic to consume''' + pass + + def shutdown(self): + '''Function shutdown the consumer if needed''' + pass + + def get_message_value_as_dict(self, msg): + '''Function that convert raw message to dictionary for easy consume and running the logic for abstract connector + Args: + msg: the message recieve in the consumer ''' + pass \ No newline at end of file diff --git a/pipeline/connector/consumer_builder.py b/pipeline/connector/consumer_builder.py new file mode 100644 index 0000000..8a4a271 --- /dev/null +++ b/pipeline/connector/consumer_builder.py @@ -0,0 +1,18 @@ +import ast +import os +from connector.consumer import AbstractConsumer +from connector.kafka.kafka_consumer import KafkaConsumer +from connector.producer import AbstractProducer +from bl.transformer_runner_interface import TransforerRunnerInterface + +def build_consumer(transformer: TransforerRunnerInterface, producer: AbstractProducer) -> AbstractConsumer: + consumer_type = os.environ.get('type') + if consumer_type == 'kafka': + kafka_config = { + 'bootstrap.servers': os.environ.get('brokers'), + 'group.id': os.environ.get('group_id'), + 'topics': [ast.literal_eval(os.environ.get('input'))['topic']] + } + return KafkaConsumer(transformer, producer, kafka_config) + else: + return None diff --git a/pipeline/kafka/consumer.py b/pipeline/connector/kafka/kafka_consumer.py similarity index 78% rename from pipeline/kafka/consumer.py rename to pipeline/connector/kafka/kafka_consumer.py index 47723c0..e35a797 100644 --- a/pipeline/kafka/consumer.py +++ b/pipeline/connector/kafka/kafka_consumer.py @@ -1,33 +1,35 @@ import ast -import config as cfg from confluent_kafka import Consumer, KafkaError, KafkaException from utils.logger import logger from bl.transformer_runner_interface import TransforerRunnerInterface -from kafka.producer import KafkaProducer +from connector.producer import AbstractProducer +from connector.consumer import AbstractConsumer -class KafkaConsumer: +class KafkaConsumer(AbstractConsumer): '''Class responsible of consumning and creating kafka consumer.''' - def __init__(self, trasformer: TransforerRunnerInterface, producer: KafkaProducer): - logger.info(f"connect to: {cfg.kafka_config['bootstrap.servers']}") + def __init__(self, trasformer: TransforerRunnerInterface, producer: AbstractProducer, configuration: dict): + logger.info(f"connect to: {configuration['bootstrap.servers']}") conf = { - 'bootstrap.servers': cfg.kafka_config['bootstrap.servers'], - 'group.id': cfg.kafka_config['group.id'], + 'bootstrap.servers': configuration['bootstrap.servers'], + 'group.id': configuration['group.id'], 'auto.offset.reset': 'latest' } + self.topics = configuration['topics'] + self.consumer = Consumer(conf) self.transformer = trasformer self.producer = producer self.running = False - def consume(self, topics): + def consume(self): '''Function start the consuming Args: topic: string array of the topic to consume''' - logger.info("start consume topics: %s", topics) + logger.info("start consume topics: %s", self.topics) self.running = True try: - self.consumer.subscribe(topics) + self.consumer.subscribe(self.topics) while self.running: msg = self.consumer.poll(timeout=1.0) diff --git a/pipeline/kafka/producer.py b/pipeline/connector/kafka/kafka_producer.py similarity index 63% rename from pipeline/kafka/producer.py rename to pipeline/connector/kafka/kafka_producer.py index a79a351..b8b5ea1 100644 --- a/pipeline/kafka/producer.py +++ b/pipeline/connector/kafka/kafka_producer.py @@ -1,28 +1,17 @@ import json -from utils.logger import logger from confluent_kafka import Producer +from connector.producer import AbstractProducer import config as cfg -class KafkaProducer(): +class KafkaProducer(AbstractProducer): '''Class responsible of producing and creating kafka prooducer.''' - def __init__(self, topic): + def __init__(self, configuration: dict): conf = { 'bootstrap.servers': cfg.kafka_config['bootstrap.servers'] } self.producer = Producer(conf) - self.topic = topic - - - def produced_callback(self, err, msg): - '''Callback after produce - Args: - err: if the produce failed - msg: the msg that produced''' - if err is not None: - logger.error("Failed to deliver message: %s:%s", str(msg), str(err)) - else: - logger.error("Message produced: %s", str(msg)) + self.topic = configuration['topic'] def produce(self, values): '''Function produce msg diff --git a/pipeline/connector/producer.py b/pipeline/connector/producer.py new file mode 100644 index 0000000..c96b66d --- /dev/null +++ b/pipeline/connector/producer.py @@ -0,0 +1,23 @@ +from utils.logger import logger + +class AbstractProducer(): + '''Class responsible of producing and creating prooducer.''' + def __init__(self, configuration: dict): + pass + + + def produced_callback(self, err, msg): + '''Callback after produce + Args: + err: if the produce failed + msg: the msg that produced''' + if err is not None: + logger.error("Failed to deliver message: %s:%s", str(msg), str(err)) + else: + logger.error("Message produced: %s", str(msg)) + + def produce(self, values): + '''Function produce msg + Args: + values: the values that will be produce''' + pass diff --git a/pipeline/connector/producer_builder.py b/pipeline/connector/producer_builder.py new file mode 100644 index 0000000..4be5623 --- /dev/null +++ b/pipeline/connector/producer_builder.py @@ -0,0 +1,13 @@ +import ast +import os +from connector.kafka.kafka_producer import KafkaProducer +from connector.producer import AbstractProducer + +def build_producer() -> AbstractProducer: + producer_type = os.environ.get('type') + if producer_type == 'kafka': + kafka_output_topic = ast.literal_eval(os.environ.get('output'))['topic'] + + return KafkaProducer({'topic': kafka_output_topic}) + else: + return None diff --git a/pipeline/main.py b/pipeline/main.py index c0b85ed..fc264df 100644 --- a/pipeline/main.py +++ b/pipeline/main.py @@ -1,15 +1,14 @@ from utils.logger import logger -from config import kafka_input_topic, kafka_output_topic -from kafka.consumer import KafkaConsumer -from kafka.producer import KafkaProducer +from connector.consumer_builder import build_consumer +from connector.producer_builder import build_producer from bl.transformation_builder import build_transformation def main(): ''' The main function is the pipeline entrypoint. ''' logger.info("starting pipeline") transformer = build_transformation() - producer = KafkaProducer(kafka_output_topic) - consumer = KafkaConsumer(transformer, producer) - consumer.consume([kafka_input_topic]) + producer = build_producer() + consumer = build_consumer(transformer, producer) + consumer.consume() main()