Skip to content

Latest commit

 

History

History
506 lines (346 loc) · 21.1 KB

2015-08-13-processing-data-pipeline-with-apache-falcon.md

File metadata and controls

506 lines (346 loc) · 21.1 KB
title layout date author tags modified_time
Processing Data Pipeline on Hadoop clusters with Apache Falcon
post
2015-08-13T12:34:00.001-07:00
Saptak Sen
falcon
hadoop
2015-08-13T15:11:18.054-07:00

Apache Falcon is a framework to simplify data pipeline processing and management on Hadoop clusters.

It makes it much simpler to onboard new workflows/pipelines, with support for late data handling and retry policies. It allows you to easily define relationship between various data and processing elements and integrate with metastore/catalog such as Hive/HCatalog. Finally it also lets you capture lineage information for feeds and processes.In this tutorial we are going walk the process of:

  • Defining the feeds and processes
  • Defining and executing a job to mirror data between two clusters
  • Defining and executing a data pipeline to ingest, process and persist data continuously

###Prerequisite

Download Hortonworks Sandbox

Once you have download the Hortonworks sandbox and run the VM, navigate to the Ambari interface on the port 8080 of the IP address of your Sandbox VM. Login with the username and password of admin and admin respectively:

Scenario

In this tutorial we will walk through a scenario where email data lands hourly on a cluster. In our example:

  • This cluster is the primary cluster located in the Oregon data center.
  • Data arrives from all the West Coast production servers. The input data feeds are often late for up to 4 hrs.

The goal is to clean the raw data to remove sensitive information like credit card numbers and make it available to our marketing data science team for customer churn analysis.

To simulate this scenario, we have a pig script grabbing the freely available Enron emails from the internet and feeding it into the pipeline.

###Starting Falcon

By default, Falcon is not started on the sandbox. You can click on the Falcon icon on the left hand bar:

Then click on the Service Actions button on the top right:

Then click on Start:

Once, Falcon starts, Ambari should clearly indicate as below that the service has started:

###Downloading and staging the dataset

Now let's stage the dataset using the commandline. Although we perform many of these file operations below using the command line, you can also do the same with the HDFS Files View in Ambari.

First SSH into the Hortonworks Sandbox with the command:

The default password is hadoop

Then login as user hdfs

su - hdfs

Then download the file falcon.zip with the following command"

wget http://hortonassets.s3.amazonaws.com/tutorial/falcon/falcon.zip

and then unzip with the command

unzip falcon.zip

Now let's give ourselves permission to upload files

hadoop fs -chmod -R 777 /user/ambari-qa

then let's create a folder falcon under ambari-qa with the command

hadoop fs -mkdir /user/ambari-qa/falcon

Now let's upload the decompressed folder with the command

hadoop fs -copyFromLocal demo /user/ambari-qa/falcon/

###Creating the cluster entities

Before creating the cluster entities, we need to create the directories on HDFS representing the two clusters that we are going to define, namely primaryCluster and backupCluster.

Use hadoop fs -mkdir commands to create the directories /apps/falcon/primaryCluster and /apps/falcon/backupCluster directories on HDFS.

hadoop fs -mkdir /apps/falcon/primaryCluster
hadoop fs -mkdir /apps/falcon/backupCluster

Further create directories called staging inside each of the directories we created above:

hadoop fs -mkdir /apps/falcon/primaryCluster/staging
hadoop fs -mkdir /apps/falcon/backupCluster/staging

Next we will need to create the working directories for primaryCluster and backupCluster

hadoop fs -mkdir /apps/falcon/primaryCluster/working
hadoop fs -mkdir /apps/falcon/backupCluster/working

Finally you need to set the proper permissions on the staging/working directories:

hadoop fs -chmod 777 /apps/falcon/primaryCluster/staging
hadoop fs -chmod 755 /apps/falcon/primaryCluster/working
hadoop fs -chmod 777 /apps/falcon/backupCluster/staging
hadoop fs -chmod 755 /apps/falcon/backupCluster/working
hadoop fs –chown –R falcon /apps/falcon/*

Let's open the Falcon Web UI. You can easily launch the Falcon Web UI from Ambari:

You can also navigate to the Falcon Web UI directly on our browser. The Falcon UI is by default at port 15000. The default username is ambari-qa and the password is admin.

This UI allows us to create and manage the various entities like Cluster, Feed, Process and Mirror. Each of these entities are represented by a XML file which you either directly upload or generate by filling up the various fields.

You can also search for existing entities and then edit, change state, etc.

Let's first create a couple of cluster entities. To create a cluster entity click on the Cluster button on the top.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="primaryCluster" description="this is primary cluster" colo="primaryColo" xmlns="uri:falcon:cluster:0.1">
    <tags>primaryKey=primaryValue</tags>
    <interfaces>
        <interface type="readonly" endpoint="hftp://sandbox.hortonworks.com:50070" version="2.2.0"/>
        <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.2.0"/>
        <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.2.0"/>
        <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.0.0"/>
        <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.1.6"/>
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/primaryCluster/staging"/>
        <location name="temp" path="/tmp"/>
        <location name="working" path="/apps/falcon/primaryCluster/working"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <properties>
        <property name="test" value="value1"/>
    </properties>
</cluster>

Click Finish on top of the XML Preview area to save the XML.

Falcon UI should have automatically parsed out the values from the XML and populated in the right fields. Once you have verified that these are the correct values press Next.

Click Save to persist the entity.

Similarly, we will create the backupCluster entity. Again click on Cluster button on the top to open up the form to create the cluster entity.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="backupCluster" description="this is backup colo" colo="backupColo" xmlns="uri:falcon:cluster:0.1">
    <tags>backupTag=backupTagValue</tags>
    <interfaces>
        <interface type="readonly" endpoint="hftp://sandbox.hortonworks.com:50070" version="2.2.0"/>
        <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.2.0"/>
        <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.2.0"/>
        <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.0.0"/>
        <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.1.6"/>
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/backupCluster/staging"/>
        <location name="temp" path="/tmp"/>
        <location name="working" path="/apps/falcon/backupCluster/working"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <properties>
        <property name="key1" value="val1"/>
    </properties>
</cluster>

Click Finish on top of the XML Preview area to save the XML and then the Next button to verify the values.

Click Save to persist the backupCluster entity.

###Defining the rawEmailFeed entity

To create a feed entity click on the Feed button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<feed name="rawEmailFeed" description="Raw customer email feed" xmlns="uri:falcon:feed:0.1">
    <tags>externalSystem=USWestEmailServers</tags>
    <groups>churnAnalysisDataPipeline</groups>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <late-arrival cut-off="hours(1)"/>
    <clusters>
        <cluster name="primaryCluster" type="source">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
            <retention limit="days(90)" action="delete"/>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/user/ambari-qa/falcon/demo/primary/input/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
        <location type="stats" path="/"/>
        <location type="meta" path="/"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <schema location="/none" provider="/none"/>
</feed>

Click Finish on the top of the XML Preview area

Falcon UI should have automatically parsed out the values from the XML and populated in the right fields. Once you have verified that these are the correct values press Next.

On the Clusters page ensure you modify the validity to a time slice which is in the very near future.

Click Next

Save the feed

###Defining the rawEmailIngestProcess entity

Now lets define the rawEmailIngestProcess.

To create a process entity click on the Process button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="rawEmailIngestProcess" xmlns="uri:falcon:process:0.1">
    <tags>email=testemail</tags>
    <clusters>
        <cluster name="primaryCluster">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
        </cluster>
    </clusters>
    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <outputs>
        <output name="output" feed="rawEmailFeed" instance="now(0,0)"/>
    </outputs>
    <workflow name="emailIngestWorkflow" version="4.0.1" engine="oozie" path="/user/ambari-qa/falcon/demo/apps/ingest/fs"/>
    <retry policy="exp-backoff" delay="minutes(3)" attempts="3"/>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
</process>

Click Finish on the top of the XML Preview area

Accept the default values and click next

On the Clusters page ensure you modify the validity to a time slice which is in the very near future and then click next

Accept the default values and click Next

Let's Save the process.

###Defining the cleansedEmailFeed

Again, to create a feed entity click on the Feed button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<feed name="cleansedEmailFeed" description="Cleansed customer emails" xmlns="uri:falcon:feed:0.1">
    <tags>cleanse=cleaned</tags>
    <groups>churnAnalysisDataPipeline</groups>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <late-arrival cut-off="hours(4)"/>
    <clusters>
        <cluster name="primaryCluster" type="source">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
            <retention limit="hours(90)" action="delete"/>
            <locations>
                <location type="data" path="/user/ambari-qa/falcon/demo/primary/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
                <location type="stats" path="/"/>
                <location type="meta" path="/"/>
            </locations>
        </cluster>
        <cluster name="backupCluster" type="target">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
            <retention limit="hours(90)" action="delete"/>
            <locations>
                <location type="data" path="/falcon/demo/bcp/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
                <location type="stats" path="/"/>
                <location type="meta" path="/"/>
            </locations>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/user/ambari-qa/falcon/demo/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
        <location type="stats" path="/"/>
        <location type="meta" path="/"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <schema location="/none" provider="/none"/>
</feed>

Click Finish on the top of the XML Preview area

Accept the default values and click Next

Accept the default values and click Next

On the Clusters page ensure you modify the validity to a time slice which is in the very near future and then click Next

Accept the default values and click Save

Defining the cleanseEmailProcess

Now lets define the cleanseEmailProcess.

Again, to create a process entity click on the Process button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="cleanseEmailProcess" xmlns="uri:falcon:process:0.1">
    <tags>cleanse=yes</tags>
    <clusters>
        <cluster name="primaryCluster">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
        </cluster>
    </clusters>
    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <inputs>
        <input name="input" feed="rawEmailFeed" start="now(0,0)" end="now(0,0)"/>
    </inputs>
    <outputs>
        <output name="output" feed="cleansedEmailFeed" instance="now(0,0)"/>
    </outputs>
    <workflow name="emailCleanseWorkflow" version="pig-0.13.0" engine="pig" path="/user/ambari-qa/falcon/demo/apps/pig/id.pig"/>
    <retry policy="exp-backoff" delay="minutes(5)" attempts="3"/>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
</process>

Click Finish on the top of the XML Preview area

Accept the default values and click Next

On the Clusters page ensure you modify the validity to a time slice which is in the very near future and then click Next

Select the Input and Output Feeds as shown below and Save

###Running the feeds

From the Falcon Web UI home page search for the Feeds we created

Select the rawEmailFeed by clicking on the checkbox

Then click on the Schedule button on the top of the search results

Next run the cleansedEmailFeed in the same way

####Running the processes

From the Falcon Web UI home page search for the Process we created

Select the cleanseEmailProcess by clicking on the checkbox

Then click on the Schedule button on the top of the search results

Next run the rawEmailIngestProcess in the same way

If you visit the Oozie process page, you can seen the processes running

###Input and Output of the pipeline

Now that the feeds and processes are running, we can check the dataset being ingressed and the dataset egressed on HDFS.

Here is the data being ingressed

and here is the data being egressed from the pipeline

Summary

In this tutorial we walked through a scenario to clean the raw data to remove sensitive information like credit card numbers and make it available to our marketing data science team for customer churn analysis by defining a data pipeline with Apache Falcon.