Skip to content

Commit

Permalink
Merge branch 'dev' into feat/2/factory-pattern-tables
Browse files Browse the repository at this point in the history
  • Loading branch information
lelouvincx authored Sep 23, 2023
2 parents eecc0fd + 51c6084 commit 0c0cb51
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 42 deletions.
89 changes: 89 additions & 0 deletions database-replication/.docker/connect-distributed.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=kafka-0:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
8 changes: 5 additions & 3 deletions database-replication/.docker/images/app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ WORKDIR /app

RUN python3 -m venv .venv
RUN pip install pip --upgrade

COPY app/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt

COPY app/ .
RUN apt-get update && apt-get install -y curl && \
apt-get autoremove -y

COPY app/app .

CMD [ "streamlit", "run", "ui.py", "--server.address=0.0.0.0" ]
CMD [ "streamlit", "run", "streamlit_app.py", "--server.address=0.0.0.0" ]
1 change: 1 addition & 0 deletions database-replication/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.docker/backups/*
restore.log
database-replication.code-workspace
htmlcov/
4 changes: 4 additions & 0 deletions database-replication/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ restart-build-d: down up-build-d
sleep:
sleep 20

# ============ Build images ============
build-upstream-app:
docker build -t upstream-app:dev -f .docker/images/app/Dockerfile .

# ============ Testing, formatting, type checks, link checks ============
app-requirements:
rm app/requirements.txt && \
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions database-replication/app/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import sys
import os


SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(SCRIPT_DIR))
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sqlalchemy import text
from faker import Faker
from psql_connector import PsqlConnector
from app.psql_connector import PsqlConnector

import os
import logging
Expand Down Expand Up @@ -60,7 +60,7 @@ def set_attributes(self, attributes: list) -> None:
self._attributes = attributes

# Methods
def update_attributes(self, connector: PsqlConnector) -> None:
def update_attributes(self, connector: PsqlConnector) -> bool:
with connector.connect() as engine:
with engine.connect() as cursor:
sql_script = f"""
Expand Down Expand Up @@ -100,9 +100,11 @@ def update_attributes(self, connector: PsqlConnector) -> None:

if new_attributes == self._attributes:
logger.info("There's nothing to change")
return False
else:
self.set_attributes(new_attributes)
logger.info("Table attributes are updated")
return True


def gen_public_test(connector: PsqlConnector, num_records: int = 1) -> None:
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import streamlit as st
import pandas as pd
from sqlalchemy import text
from psql_connector import PsqlConnector
from app.psql_connector import PsqlConnector

from gen_data import gen_public_test

Expand All @@ -11,7 +11,7 @@
import logging
import time

load_dotenv(dotenv_path=".env")
load_dotenv(dotenv_path="../../.env")

# Init logging
logging.basicConfig(
Expand Down
3 changes: 3 additions & 0 deletions database-replication/app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pydeck==0.8.0
Pygments==2.16.1
Pympler==1.0.1
pytest==7.4.2
pytest-cov==4.1.0
pytest-dependency==0.5.1
pytest-ordering==0.6
python-dateutil==2.8.2
python-dotenv==1.0.0
python-json-logger==2.0.7
Expand Down
55 changes: 55 additions & 0 deletions database-replication/app/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Config relative path to module app
from os.path import dirname, abspath
import sys

parent_dir = dirname(dirname(abspath(__file__)))
sys.path.append(parent_dir)


from app.psql_connector import PsqlConnector
from sqlalchemy import text
from dotenv import load_dotenv
from os import environ as env
import pytest

load_dotenv()

psql_params = {
"host": env["POSTGRES_HOST"],
"port": env["POSTGRES_PORT"],
"user": env["POSTGRES_USER"],
"password": env["POSTGRES_PASSWORD"],
"database": env["POSTGRES_DB"],
}


@pytest.fixture
def create_temp_table():
psql_connector = PsqlConnector(psql_params)

# Create temp_table
with psql_connector.connect() as engine:
with engine.connect() as cursor:
sql_script = text("""
CREATE TEMP TABLE temp_table (
id serial PRIMARY KEY,
name VARCHAR(50),
age INT
)
""")
cursor.execute(sql_script)
cursor.commit()

# Find schema of temp_table
with psql_connector.connect() as engine:
with engine.connect() as cursor:
sql_script = text("""
SELECT schemaname
FROM pg_tables
WHERE tablename = 'temp_table';
""")
temp_table_schema = cursor.execute(sql_script).fetchone() or []
temp_table_schema = temp_table_schema[0] or str

# Create Table object for temp_table
return temp_table_schema
55 changes: 55 additions & 0 deletions database-replication/app/tests/test_gen_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from app.gen_data import Table, PsqlConnector
from conftest import psql_params
from sqlalchemy import text
import pytest


class TestTable:
@pytest.mark.first
@pytest.mark.dependency(name="TEST_CONNECTING")
def test_connecting(self):
psql_connector = PsqlConnector(psql_params)
is_connected = False
with psql_connector.connect() as engine:
with engine.connect() as cursor:
is_connected = True
cursor.commit()
assert is_connected is True, "Not connected to database"

@pytest.mark.dependency(depends=["TEST_CONNECTING"])
def test_update_attributes(self):
psql_connector = PsqlConnector(psql_params)

# Create temp_table
with psql_connector.connect() as engine:
with engine.connect() as cursor:
sql_script = text("""
CREATE TEMP TABLE temp_table (
id serial PRIMARY KEY,
name VARCHAR(50),
age INT
)
""")
cursor.execute(sql_script)
cursor.commit()

# Find schema of temp_table
with psql_connector.connect() as engine:
with engine.connect() as cursor:
sql_script = text("""
SELECT schemaname
FROM pg_tables
WHERE tablename = 'temp_table';
""")
temp_table_schema = cursor.execute(sql_script).fetchone() or []
temp_table_schema = temp_table_schema[0]

temp_table = Table(schema=temp_table_schema, name="temp_table")
is_changed = temp_table.update_attributes(psql_connector)

assert is_changed is True, "Attributes not changed."

@pytest.mark.skip(reason="Not implemented due to WIP")
@pytest.mark.dependency(depends=["TEST_CONNECTING"])
def test_generate(self):
pass
32 changes: 32 additions & 0 deletions database-replication/app/tests/test_psql_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from app.psql_connector import PsqlConnector
from conftest import psql_params
from sqlalchemy import text
import pytest


class TestPsqlConnector:
@pytest.mark.first
@pytest.mark.dependency(name="TEST_CONNECTING")
def test_connecting(self):
psql_connector = PsqlConnector(psql_params)
is_connected = False
with psql_connector.connect() as engine:
with engine.connect() as cursor:
is_connected = True
cursor.commit()
assert is_connected is True, "Not connected to database."

@pytest.mark.dependency(depends=["TEST_CONNECTING"])
def test_getting_data(self):
psql_connector = PsqlConnector(psql_params)
with psql_connector.connect() as engine:
with engine.connect() as cursor:
sql_script = "SELECT 1;"
fetched_data = 0
try:
fetched_data = cursor.execute(text(sql_script)).fetchone() or []
fetched_data = fetched_data[0] or int
except Exception as e:
print(f"Error when retrieving results from database: {e}")
assert False, "Error when retrieving results."
assert fetched_data == 1
Loading

0 comments on commit 0c0cb51

Please sign in to comment.