This project aims to simiulate the process of data ingestion and data processing of social media data using AWS data engineering services
You would need an aws account to set up this pipeline.
To set up your EC2 enviroment, you would need to generate a key pair (this is stores as a .pem file) and use this to connect with the ec2 securely.
Due to security reasons, this file is private . To do this, navigate to where the .pem is and run the following command in terminal.
chomd 400 "your_private_key.pem"
Then run this:
ssh -i /path/key-pair-name.pem instance-user-name@instance-public-dns-name
instance-user-name refers to if you're the root account or you're just an ec2 user.
You'll have to create your own MSK cluster on AWS. In this project, the cluster name is called pinterest_msk
.
Ensure that your ec2 has the necessary permission to utlise MSK IAM as well as write to the destination bucket.
To download kafka (ensure that the kafka file is the same version as the IAM MSK authenticator) INSIDE YOU'RE EC2 Terminal:
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
Install IAM MSK authentication package
into your EC2 client. Allowing IAM access control provide us a way to authenticate and authorise the clusters, checking if the client is allowed to perform certain tasks.
your IAM trust policy should looks somthing like this:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"ec2.amazonaws.com",
"kafkaconnect.amazonaws.com"
],
"AWS": "arn:aws:iam::xxxxxx:role/xxxxxxxxxx-ec2-access-role"
},
"Action": "sts:AssumeRole"
}
]
}
Now you should have the correct permissions
command INSIDE OF YOU EC2 Terminal
cd your_kafka_folder/libs
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.5/aws-msk-iam-auth-1.1.5-all.jar
You may want to set up a classpath to the msk file, so the ec2 client can find and utilise the the msk iam file. This can be done in the .bashrc
file.
configure your EC2 client using AWS IAM for cluster authentication and your client.properties inside your kafka folder.
configure your EC2 client to use AWS IAM for cluster authentication using your client.properties inside your kafka folder.
expect the following format:
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM
# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="Your Access Role";
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
your access role should refer to your ec2 client
Ensure that you have your Bootstrap servers string and the Plaintext Apache Zookeeper connection string. This can be found in the MSK cluster.
To create your kafka topic:
Navigate to your kafka bin file your_kafka_file/bin
run the following command:
./kafka-topics.sh --bootstrap-server BootstrapServerString --command-config client.properties --create --topic <topic_name>
with BootstrapServerString
being the Bootstrap servers string. <topic_name>
refering to the topic name
Each topic was created for each datapoints relating to geographical, user and pinterest post.
Create custom plugins using MSK Connect in order to data to automatically stored and saved into aws s3 bucket.
A plugin will contain the code that defines the logic and behaviour of our connector.
Inside your EC2 Terminal download confluent.io and copy to your desired s3 bucket.
run the following command:
# assume admin user privileges
sudo -u ec2-user -i
# create directory where we will save our connector
mkdir kafka-connect-s3 && cd kafka-connect-s3
# download connector from Confluent
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.0.3/confluentinc-kafka-connect-s3-10.0.3.zip
# copy connector to our S3 bucket
aws s3 cp ./confluentinc-kafka-connect-s3-10.0.3.zip s3://<BUCKET_NAME>/kafka-connect-s3/
Navigate to AWS MSK, under MSK Connect click Customised plugins
Create the custom plugin by clicking on Create customised plugin
and locate the confluentinc-kafka-connect-s3-10.0.3.zip file. Assign name to plugin
Under the same location as custom plugin, click connector then press Create Connector
.
under the list of plugin, select your recently created plugin, create a name and then your MSK cluster. After configure the connector configuration setting.
Create a new api. link
Next create resources by selecting create resource.
For resource name type:
{proxy+}
AND SELECT THE CORS OPTION
You then need to add an endpoint. In this case we will use a HTTP enpoint to communicate with an HTTP-endpoint backend.
For url endpoint, follow this format:
http://KafkaClientEC2InstancePublicDNS:8082/{proxy}
ensure thay your ec2 has the correct policies and trusts.
Then download the REST proxy package:
sudo wget https://packages.confluent.io/archive/7.2/confluent-7.2.0.tar.gz
tar -xvzf confluent-7.2.0.tar.gz
Like configuring the ec2, we need to configure this to the MSK cluster to do iam authentication.
navigate to confluent-7.2.0/etc/kafka-rest
the do this command:
nano kafka-rest.properties
you need to modify the bootstrap.servers and the zookeeper.connect variables in this file, with the corresponding Boostrap server string and Plaintext Apache Zookeeper connection string.
Expect a similar format to the client.properties file.
Finally Deploy your API and make note of your invoke URL.
Navigate to confluent-7.2.0/bin
folder and run the following command INSIDE YOUR EC2 terminal
./kafka-rest-start /home/ec2-user/confluent-7.2.0/etc/kafka-rest/kafka-rest.properties
Now everthing has been set up, you can now send data directly to the s3 buckets through MSK. On a new terminal (not ec2), run here and see the data being consumed.
Important: this stage is not avaliable for Databricks Community Edition account
-
retrieve the access and secret key from AWS IAM. You should receieve a CSV file containing this information.
-
Then load said csv file to databricks using databrick UI as seen below.
-
get the path to the csv file, read the Delta table to a Spark DataFrame
-
extract the access key and secret access key from the spark dataframe and encode the secret access key.
-
mount the aws bucket to databricks and where to mount it to using
dbutils.fs.mount()
. this is done once and can be retieved any time -
find desired data and load it as a spark dataframe by reading the data.
End result: 3 spark dataframes are formed.
Cleaning each dataframe can be found in the databricks_mounting_and_analysis folder. This includes the various steps taken to remove duplicates, correct erroneous data, and perform data transformations such as re-ordering columns and adjusting data types.
The query to analyse different users activities based on countries, age_groups and etc can be found in querying_data
For loading(mounting) the data from S3 bucket, cleaning and analysing the dataframe, Apache Spark (namely Pyspark) was used.
PySpark was used as it provided a more flexible approach when transforming data.
This step consist of automating the extraction, cleaning and querying spark dataframes consisting of data from S3 buckets using AWS MWAA (Managed Workflow of Apache Airflow).
The results can be seen by running DAG.py. Once triggered, its set to complete the process daily as seen below from Aiflow UI.
After a couple of days:
Now i've obtained some real-time data. Using the same tools for retrieve batch data wouldn't suffice. So a new approach was introduced using AWS kinesis Data Stream.
To retieve the data, this project uses REST API requests (such as POST, GET, DELETE and etc).
-
Create the stream, so there's a place(or sink) to store streaming data.
-
Using API gateway, create resources and methods that:
- List streams in Kinesis
- Create, describe and delete streams in Kinesis
- Add records to streams in Kinesis
-
run , you should be able to see the data in AWS Kinesis Data Stream.
Using the ACCESS_KEY AND SECRET_KEY used to retieving data from S3 buckets, is also used to access aws kinesis.
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','<KINESIS_STREAM_NAME>') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
Normally, the streaming data is serialised for easy transmission or saving. We need to deserialise it(yay!) so that it can be converts it back into a structure (like a dictionary, a tuple, or a custom object) that Spark can process.
To deserialise the results:
df = df.selectExpr("CAST(data as STRING)")
### Transform the data
The data cleaning process used to clean thhe batch data was used to clean the streaming data(for user, geographical and pinterest post). Please refer to cleaning the pinterest post , geographical and pinterest users dataframes section.
The results of the dataframes were written to delta lakes using databricks.
results of the data being stored in delta lakes
Note Your databrick cluster may crash, so you might need to restart it. Also when writing to same checkpoint over time can cause error, so use a different checkpoint location
To create and activate the new enviroment write the following code into command line:
conda env create -f enviroments/environment.yml
conda activate pinterest_data_pipeline
### for bash users:
Run the following command into the terminal.
./env_setup_for_pip_users.sh
find bash file inside environments
Note The credentials relating to the creation of ec2 instances and aws credentials is only visable to the owner of the repository(aka me) and to contributors. For more information, please see the contribution section.
.
├── 1232252d77df_dag.py
├── README.md
├── awsdb.py
├── databrick_mounting_and_analysis
│ ├── cleaning_spark_dataframe.ipynb
│ ├── mounting_s3_data_to_databricks.ipynb
│ ├── querying_data.ipynb
│ └── reading_streaming_data.ipynb
├── environments
│ ├── env_for_pip_user.sh
│ ├── environment.yml
│ └── requirements.txt
├── img
│ ├── CloudPinterestPipeline.png
│ ├── airflow_days_later.png
│ ├── airflow_mwaa_ui.png
│ ├── df_geo.png
│ ├── df_pin.png
│ ├── df_user.png
│ ├── image.png
│ ├── loading_data_into_topics.png
│ ├── msk_connnector.png
│ └── msk_plugin.png
├── user_posting_emulation.py
└── user_posting_emulation_streaming.py
4 directories, 22 files
Make a pull request lol.