Skip to content

zeodtr/SparkMultiTool

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SparkMultiTool

Tools for spark which we use on the daily basis. It contains:

  • Loader of HDFS files with combining small files (uses Hadoop CombineFileInputFormat)
  • Future: cosine calculation
  • Future: Quantile calculation

#Requirements This library was succeffully tested with CDH 5.1.2 and Spark 1.1.0. You should install SBT:

#Build

For building install sbt, launch a terminal, change current to sparkmultitool directory and launch a command:

sbt package
sbt test

Next copy spark-multitool*.jar from ./target/scala-2.10/... to the lib folder of your sbt project.

#Usage Include spark-multitool*.jar in --jars path in spark-submit like this:

spark-submit --master local --executor-memory 2G --class "Tst" --num-executors 1 --executor-cores 1 --jars lib/spark-multitool_2.10-0.1.jar target/scala-2.10/tst_2.10-0.1.jar

See examples folder.

##Loaders ru.retailrocket.spark.multitool.Loaders - combine input files before mappers by means of Hadoop CombineFileInputFormat. In our case it reduced the number of mappers from 100000 to approx 3000 and made job significantly faster. Parameters:

  • sc - SparkContext object
  • path - path to the files (as in spark.textFile)
  • size - size of target partition in Megabytes. Optimal value equals to a HDFS block size
  • delim - line delimiters

This example loads files from "/test/*" and combine them in mappers.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import ru.retailrocket.spark.multitool.Loaders

object Tst{
	def main(args: Array[String]) ={
	val conf = new SparkConf().setMaster("local").setAppName("My App")
	val sc = new SparkContext("local", "My App")

	val sessions = Loaders.combineTextFile(sc, "file:///test/*")
  // or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n")
  // where size is split size in Megabytes, delim - line break string

	println(sessions.count())
   }
}

About

Tools for spark which we use on the daily basis

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published