Skip to content

Commit

Permalink
Merge pull request apache#9 from dashar/YSPARK-599
Browse files Browse the repository at this point in the history
[YSPARK-599] Add spark hbase connector example
  • Loading branch information
Dhruve Ashar authored and GitHub Enterprise committed Jul 6, 2018
2 parents 9ce1c35 + d94ca32 commit 5a159bc
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 11 deletions.
Binary file added lib/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar
Binary file not shown.
25 changes: 21 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<spark.scala.version>2.11</spark.scala.version>
<scala.version>2.11.8</scala.version>
<java.version>1.7</java.version>
<hbase.version>0.98.7.19.1608152337_h2</hbase.version>
<hbase.version>1.3.2.13.1806282333_h2</hbase.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -78,6 +78,11 @@
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.0</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -116,9 +121,21 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>install-file</goal>
</goals>
<configuration>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<file>${basedir}/lib/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar</file>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
Expand Down
97 changes: 97 additions & 0 deletions src/main/scala/com/yahoo/spark/starter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#### Connecting to Apache HBase from Apache Spark using the SHC(Apache Spark - Apache HBase) Connector.
For the original project you can look up the documentation at:
* https://github.com/hortonworks-spark/shc

For using the connector you need to use HBase Version 1.3 and above. Most of our grids (verify the status of HBase deployment) have the required version of HBase although we have 0.98 clients running on most of the gateways for backward compatibility.


***

#### Setup

* You would need hbase 1.3 jars to be supplied with your spark job.
```bash
# Since hbase-1.3 is not available on the gateways, you need to install hbase-1.3 locally to access the jars.
yinst i hbase-1.3.2.9.1804250211_h2 -br quarantine -r ~/hbase -nosudo

# Define an ENV variable - HBASE_JARS to point to the directory you installed hbase locally.
export HBASE_JARS=~/hbase/libexec/hbase/lib
```

#### Getting the connector
You can add the spark hbase connector jar in your project directory and update your pom file to package the dependencies with your application jar. Refer to the pom file in the spark-starter project for details.

1. Copy the spark-hbase-connector jar from the spark-starter project to your project under a directory - `lib`. It can be any appropriate location.
2. Update your pom file to ensure the dependency is installed when you build your project.
```xml
<!-- Add this to the plugins section -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>install-file</goal>
</goals>
<configuration>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<file>${basedir}/lib/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar</file>
</configuration>
</execution>
</executions>
</plugin>

<!-- specify the dependency under the dependencies section -->
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.0</version>
</dependency>
```

* Launching in cluster mode
```bash
# launch the shell
$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --conf "spark.hadoop.validateOutputSpecs=false" --jars $HBASE_JARS/hbase-protocol.jar,$HBASE_JARS/hbase-common.jar,$HBASE_JARS/hbase-client.jar,$HBASE_JARS/htrace-core-3.1.0-incubating.jar,$HBASE_JARS/hbase-server.jar,$HBASE_JARS/guava-12.0.1.jar,$HBASE_JARS/metrics-core-2.2.0.jar --files $SPARK_CONF_DIR/hbase-site.xml ...
```

* Launching in client mode
```bash
# launch the shell
$SPARK_HOME/bin/spark-shell --master yarn --deploy-mode client --conf "spark.hadoop.validateOutputSpecs=false" --jars $HBASE_JARS/hbase-protocol.jar,$HBASE_JARS/hbase-common.jar,$HBASE_JARS/hbase-client.jar,$HBASE_JARS/htrace-core-3.1.0-incubating.jar,$HBASE_JARS/hbase-server.jar,$HBASE_JARS/guava-12.0.1.jar,$HBASE_JARS/metrics-core-2.2.0.jar,shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar
```

* If you plan to write to HBase using the connector, set the required conf `"spark.hadoop.validateOutputSpecs=false"`. This is to workaround a bug in HBase. See YHBASE-2131 for more details.

### Using the connector
Check the hortonworks documentation for more details and examples [here](https://github.com/hortonworks-spark/shc) for further details.
Below is an additional example for your reference explaining how to define your own catalog.You need to define a catalog so that a DataFrame can be mapped to an HBase table or vice-versa. This is defined in a json format.
```
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"df_col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"df_col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"df_col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"df_col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"df_col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"df_col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"df_col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"df_col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"df_col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
```
* `table` - you specify the namespace and the name of the table
* `rowkey` - you specify the column/s in the hbase table that form the rowkey. Composite keys are not currently supported.
* `columns` field, you specify the name of the column in the DataFrame and its corresponding column_family, column_name and column_type in hbase. You identify the column/s which are used to form the rowkey using the column_family as rowkey.

Note: Since a user can add any arbitrary columns to a column family, individual columns are not specified under the table schema in HBase table description. Therefore it is helpful to know the columns of interest available in hbase table while defining the catalog.



16 changes: 9 additions & 7 deletions src/main/scala/com/yahoo/spark/starter/SparkClusterHBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ object SparkClusterHBase {

def main(args: Array[String]) {

if (args == null || args.length < 1) {
System.err.println("Usage: SparkClusterHBase <tableName>")
if (args == null || args.length < 2) {
System.err.println("Usage: SparkClusterHBase <nameSpace> <tableName>")
System.exit(1)
}

Expand All @@ -25,19 +25,21 @@ object SparkClusterHBase {
getOrCreate()

val hconf = HBaseConfiguration.create()
val tableName = s"spark_test:${args(0)}"
hconf.set(TableInputFormat.INPUT_TABLE, tableName)
val nameSpace = args(0)
val tableName = args(1)
val qualifiedTableName = nameSpace + ":" + tableName
hconf.set(TableInputFormat.INPUT_TABLE, qualifiedTableName)
val admin = new HBaseAdmin(hconf)

// create the table if not existed
if(!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
if(!admin.isTableAvailable(qualifiedTableName)) {
val tableDesc = new HTableDescriptor(qualifiedTableName)
tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes()));
admin.createTable(tableDesc)
}

// put data into the table
val myTable = new HTable(hconf, tableName);
val myTable = new HTable(hconf, qualifiedTableName);
for (i <- 0 to 5) {
val p = new Put(new String("row" + i).getBytes());
p.add("cf1".getBytes(), "column-1".getBytes(), new String("value " + i).getBytes());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.yahoo.spark.starter

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.SparkSession

case class DataRow(name: String, pet: String, horcruxes: Int, original_name: String)

// Simple example of accessing HBase from Spark using the Spark HBase connector.
// The example creates an HBase table and writes sample data to it using the connector
// and reads back data from the same table and displays it to the stdout.
object SparkClusterHBaseConnector {

def main(args: Array[String]) {

if (args == null || args.length < 2) {
System.err.println("Usage: SparkClusterHBaseConnector <nameSpace> <tableName>")
System.exit(1)
}

// Use the new 2.0 API.
val spark = SparkSession.
builder.
appName("Spark HBase Connector Example").
getOrCreate()

import spark.implicits._

val nameSpace = s"${args(0)}"
val tableName = s"${args(1)}"

val df = Seq( DataRow("harry", "hedwig", 0, null), DataRow("voldy", "nagini", 7, "Tom"))
.toDF("name", "pet", "horcruxes", "original_name")
// print the original dataframe
df.show

// define the catalog
def catalog = s"""{
|"table":{"namespace":"${nameSpace}", "name":"${tableName}"},
|"rowkey":"name",
|"columns":{
|"name":{"cf":"rowkey", "col":"name", "type":"string"},
|"pet":{"cf":"cf1", "col":"pet", "type":"string"},
|"horcruxes":{"cf":"cf2", "col":"horcruxes", "type":"int"},
|"original_name":{"cf":"cf3", "col":"original_name", "type":"string"}
|}
|}""".stripMargin

// write data to hbase
hbaseWrite(df, catalog)

// read the data from hbase
val df2 = hbaseRead(spark, catalog)
df2.show

spark.stop
}

// Read data from an existing hbase table
def hbaseRead(spark: SparkSession, catalog: String): DataFrame = {
spark
.read
.options(Map(HBaseTableCatalog.tableCatalog->catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

// Write data to an hbase table
def hbaseWrite(df: DataFrame, catalog: String): Unit = {
df
.write
.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save
}
}

0 comments on commit 5a159bc

Please sign in to comment.