This repository has been archived by the owner on Jan 29, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 602
Spark Usage
Luke Lovett edited this page Feb 19, 2015
·
24 revisions
This page describes how to use the MongoDB Hadoop Connector with Spark.
- Obtain the MongoDB Hadoop Connector. You can either build it or download the jars. The releases page also includes instructions for use with Maven and Gradle. For Spark, all you need is the "core" jar.
- Get a JAR for the MongoDB Java Driver.
These are the only two depencies for building a project using Spark and MongoDB.
This goes through the basics of setting up your Spark project to use MongoDB as a source or a sink. Although the example code is in Java, the equivalent code should work in Scala as well. PySpark is not supported. At a high level, here's what we're going to do:
- Create a new
Configuration
so we can set options on the MongoDB Hadoop Connector. - Create a
newAPIHadoopRDD
using thisConfiguration
and theInputFormat
class we want to use, based on whether we want to read from a live cluster or a BSON snapshot. - When we're ready to save data back into MongoDB or a BSON file, we'll call the
saveAsNewAPIHadoopFile
on the RDD with theOutputFormat
class we want.
Here's a basic runthrough:
// Set configuration options for the MongoDB Hadoop Connector.
Configuration mongodbConfig = new Configuration();
// MongoInputFormat allows us to read from a live MongoDB instance.
// We could also use BSONFileInputFormat to read BSON snapshots.
mongodbConfig.set("mongo.job.input.format",
"com.mongodb.hadoop.MongoInputFormat");
// MongoDB connection string naming a collection to use.
// If using BSON, use "mapred.input.dir" to configure the directory
// where BSON files are located instead.
mongodbConfig.set("mongo.input.uri",
"mongodb://localhost:27017/db.collection")
// Create an RDD backed by the MongoDB collection.
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
// Create a separate Configuration for saving data back to MongoDB.
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.format",
"com.mongodb.hadoop.MongoOutputFormat");
outputConfig.set("mongo.output.uri",
"mongodb://localhost:27017/output.collection");
// Save this RDD as a Hadoop "file".
// The path argument is unused; all documents will go to 'mongo.output.uri'.
documents.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
Object.class,
BSONObject.class,
MongoOutputFormat.class,
outputConfig
);
// We can also save this back to a BSON file.
Configuration bsonOutputConfig = new Configuration();
bsonOutputConfig.set(
"mongo.job.output.format",
"com.mongodb.hadoop.BSONFileOutputFormat"
);
documents.saveAsNewAPIHadoopFile(
"hdfs://localhost:8020/user/spark/bson-demo",
Object.class,
BSONObject.class,
BSONFileOutputFormat.class,
bsonOutputConfig
);