In the highly competitive logistics and expedition industry, businesses must continuously optimize their operations to stay ahead. For an expedition company, this means efficiently managing warehouse capacities, employee allocations, and processing times. The challenge lies in accurately tracking the time span of each business process and identifying areas that need improvement. The "CDC and SCD Approach to Enhancing Expedition Operations" project addresses this challenge by implementing a data pipeline that leverages Change Data Capture (CDC) and Slowly Changing Dimension (SCD) type 2. This approach allows the company to monitor changes in key processes and make data-driven decisions to optimize operations, such as adjusting warehouse capacities or reallocating employees to high-demand locations.
The primary goals of the CDC and SCD project include:
-
Real-Time Data Tracking: Implement CDC to monitor and capture real-time changes in order statuses (e.g., processing, shipping, arrived) and stream this data into BigQuery via Kafka. This enables the company to maintain an up-to-date view of business operations across all locations.
-
Historical Data Management: Use SCD type 2 to track and store the history of changes in key business processes. By updating the end_date field in the data warehouse before inserting new records, the company can maintain a comprehensive history of each process, allowing for more accurate analysis and decision-making.
-
Operational Optimization: Utilize the insights gained from real-time and historical data to identify and optimize underperforming warehouses or offices. The goal is to enhance overall business efficiency, reduce operational bottlenecks, and improve service delivery times.
-
Clone the project
git clone https://github.com/ArkanNibrastama/cdc-mysql-bigquery.git
-
Install all the dependencies
pip install -r requirements.txt
-
Build all the containers
docker-compose up -d
-
Create database on mysql container, you can use universal database tool like DBeaver to create database, or you can go to mysql container > terminal then type
mysql -u root -p 123
then
CREATE DATABASE expedition;
-
Run the API
uvicorn orderAPI:api --reload
-
Make a Debezium connector
{ "name": "expedition-order-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "123", "database.server.id": "184054", "topic.prefix": "source", "database.include.list": "expedition", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.expedition", "transforms" : "unwrap", "transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } }
make an API request (POST) and fill the header with JSON configuration below.
-
Check the topics
python topics.py
if there is no source.expedition.order topic, try to create new data on the API then try to run this code again.
-
Send data into BigQuery by run consumer.py, before that make sure to add the service account json file and create expedition dataset on BigQuery
python consumer.py
The implementation of the CDC and SCD approach has had a significant impact on the expedition company's operations. By enabling real-time tracking and comprehensive historical analysis, the company has been able to identify inefficiencies in their processes and make informed decisions to optimize warehouse capacities and employee allocations. As a result, the company can optimize approximately 30% of the delivery process, leading to increased customer satisfaction and a stronger competitive position in the market.
To make better understand of this repository, you can check my Medium post about this project CDC and SCD Approach to Enhancing Expedition Operations.