Skip to content

Commit

Permalink
upgrade achille 5.3.1, embedded-elastic 2.4.2, spark 2.2.0_2.11, solr
Browse files Browse the repository at this point in the history
6.6.0
  • Loading branch information
jetoile committed Oct 1, 2017
1 parent 80a1ba9 commit f575245
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 171 deletions.
8 changes: 4 additions & 4 deletions hadoop-unit-client/hadoop-unit-client-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<artifactId>spark-core_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>

<dependency>
<artifactId>jackson-module-scala_2.10</artifactId>
<artifactId>jackson-module-scala_2.11</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion hadoop-unit-standalone/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<aetherVersion>1.0.0.v20140518</aetherVersion>
<mongo-java-driver.version>2.11.3</mongo-java-driver.version>
<neo4j-java-driver.version>1.0.4</neo4j-java-driver.version>
<cassandra-driver-core.version>3.1.1</cassandra-driver-core.version>
<cassandra-driver-core.version>3.3.0</cassandra-driver-core.version>
<oozie-client.version>4.2.0.2.5.3.0-37</oozie-client.version>
<hbase-client.version>1.2.3</hbase-client.version>
<alluxio-core-client.version>1.4.0</alluxio-core-client.version>
Expand Down
34 changes: 17 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@

<hadoop-unit.version>${project.version}</hadoop-unit.version>

<slf4j.version>1.7.12</slf4j.version>
<slf4j.version>1.7.25</slf4j.version>
<logback-classic.version>1.2.3</logback-classic.version>

<commons-io.version>2.5</commons-io.version>
Expand All @@ -133,32 +133,32 @@
<hadoop-mini-clusters.version>0.1.14</hadoop-mini-clusters.version>

<hive.version>1.2.1000.2.6.2.0-205</hive.version>
<solr.version>6.1.0</solr.version>
<solr.version>6.6.1</solr.version>
<oozie.version>4.2.0.2.6.2.0-205</oozie.version>
<hadoop.version>2.7.3.2.6.2.0-205</hadoop.version>
<knox.version>0.12.0.2.6.2.0-205</knox.version>
<kafka_2.10.version>0.10.1.2.6.2.0-205</kafka_2.10.version>

<spark_2.10.version>1.6.2</spark_2.10.version>
<spark-solr.version>1.1.2</spark-solr.version>
<spark_2.11.version>2.2.0</spark_2.11.version>
<spark-solr.version>3.2.1</spark-solr.version>
<gateway-service-oozie.version>0.5.0.2.2.9.26-3</gateway-service-oozie.version>

<dbSetup.version>2.1.0</dbSetup.version>
<httpclient.version>4.3.1</httpclient.version>
<guava.version>18.0</guava.version>
<concurrentlinkedhashmap-lru.version>1.4</concurrentlinkedhashmap-lru.version>
<lombok.version>1.16.6</lombok.version>
<jetty-util.version>9.3.10.v20160621</jetty-util.version>
<jetty-util.version>9.3.21.v20170918</jetty-util.version>
<javax.servlet-api.version>3.1.0</javax.servlet-api.version>

<embedded-mongo.version>2.0.0</embedded-mongo.version>

<elasticsearch.version>5.4.3</elasticsearch.version>
<embedded-elasticsearch.version>2.2.0</embedded-elasticsearch.version>
<embedded-elasticsearch.version>2.4.2</embedded-elasticsearch.version>

<metrics.version>3.1.2</metrics.version>
<achilles.version>5.2.1</achilles.version>
<jackson-module-scala_2.10.version>2.6.5</jackson-module-scala_2.10.version>
<achilles.version>5.3.1</achilles.version>
<jackson-module-scala_2.11.version>2.6.5</jackson-module-scala_2.11.version>

<neo4j.version>3.2.2</neo4j.version>
<alluxio.version>1.4.0</alluxio.version>
Expand Down Expand Up @@ -689,36 +689,36 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark_2.10.version}</version>
<artifactId>spark-core_2.11</artifactId>
<version>${spark_2.11.version}</version>
<exclusions>
<exclusion>
<artifactId>hadoop-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-module-scala_2.10</artifactId>
<artifactId>jackson-module-scala_2.11</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<artifactId>jackson-module-scala_2.10</artifactId>
<artifactId>jackson-module-scala_2.11</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
<version>${jackson-module-scala_2.10.version}</version>
<version>${jackson-module-scala_2.11.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark_2.10.version}</version>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark_2.11.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark_2.10.version}</version>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark_2.11.version}</version>
</dependency>

<dependency>
Expand Down
16 changes: 10 additions & 6 deletions sample/kafka-spark-streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

<artifactId>kafka-spark-streaming</artifactId>

<properties>
<spark-streaming-kafka_2.11.version>1.6.3</spark-streaming-kafka_2.11.version>
</properties>

<dependencies>
<dependency>
<groupId>fr.jetoile.hadoop</groupId>
Expand All @@ -31,8 +35,8 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark_2.10.version}</version>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>${spark-streaming-kafka_2.11.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
Expand All @@ -57,19 +61,19 @@
</dependency>

<dependency>
<artifactId>jackson-module-scala_2.10</artifactId>
<artifactId>jackson-module-scala_2.11</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark_2.10.version}</version>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark_2.11.version}</version>
</dependency>


</dependencies>


<profiles>
<profile>
<id>default</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ public void run() {
topicMap);

JavaDStream<String> messages = stream.map(r -> r._2());
messages.foreach(r -> {
r.foreach(t -> {
messages.foreachRDD(r -> {
System.out.println("========================");
System.out.println(t);
System.out.println(r);
});
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void setUp() throws ConfigurationException {

@Ignore
@Test
public void stark_should_read_kafka() {
public void stark_should_read_kafka() throws InterruptedException {

for (int i = 0; i < 10; i++) {
String payload = generateMessge(i);
Expand Down
8 changes: 4 additions & 4 deletions sample/parquet-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<artifactId>spark-core_2.11</artifactId>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
Expand All @@ -25,7 +25,7 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<artifactId>spark-sql_2.11</artifactId>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
Expand All @@ -36,12 +36,12 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<artifactId>jackson-module-scala_2.11</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,19 @@

package fr.jetoile.hadoopunit.sample;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkJob {

private final JavaSparkContext sc;
private final SparkSession sqlContext;

public SparkJob(JavaSparkContext sc) {
this.sc = sc;
public SparkJob(SparkSession sqlContext) {
this.sqlContext = sqlContext;
}

public DataFrame run() {
HiveContext sqlContext = new HiveContext(sc);

public Dataset<Row> run() {
return sqlContext.sql("SELECT * FROM default.test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,14 +46,10 @@

public class SparkJobIntegrationTest {

static private Logger LOGGER = LoggerFactory.getLogger(SparkJobIntegrationTest.class);


private static Configuration configuration;

public static Operation CREATE_TABLES = null;
public static Operation DROP_TABLES = null;

static private Logger LOGGER = LoggerFactory.getLogger(SparkJobIntegrationTest.class);
private static Configuration configuration;

@BeforeClass
public static void setUp() throws BootstrapException, SQLException, ClassNotFoundException, NotFoundServiceException {
Expand Down Expand Up @@ -117,73 +112,61 @@ public void upload_file_into_hdfs_and_map_hive_should_success() throws SQLExcept

@Test
public void spark_should_read_hive() throws SQLException {
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test");

JavaSparkContext context = new JavaSparkContext(conf);
SparkSession sqlContext = SparkSession.builder().appName("test").master("local[*]").enableHiveSupport().getOrCreate();

SparkJob sparkJob = new SparkJob(context);
DataFrame sql = sparkJob.run();
SparkJob sparkJob = new SparkJob(sqlContext);
Dataset<Row> sql = sparkJob.run();

// sql.printSchema();
Row[] rows = sql.collect();
Row[] rows = (Row[])sql.collect();

for (int i = 1; i < 4; i++) {
Row row = rows[i - 1];
assertThat("value" + i).isEqualToIgnoringCase(row.getString(1));
assertThat(i).isEqualTo(row.getInt(0));
}

context.close();
sqlContext.close();
}

@Test
public void spark_should_create_a_parquet_file() throws SQLException, IOException {
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test");
SparkSession sqlContext = SparkSession.builder().appName("test").master("local[*]").enableHiveSupport().getOrCreate();

JavaSparkContext context = new JavaSparkContext(conf);

SparkJob sparkJob = new SparkJob(context);
DataFrame sql = sparkJob.run();
SparkJob sparkJob = new SparkJob(sqlContext);
Dataset<Row> sql = sparkJob.run();

sql.write().parquet("hdfs://localhost:" + configuration.getInt(HadoopUnitConfig.HDFS_NAMENODE_PORT_KEY) + "/khanh/test_parquet/file.parquet");

FileSystem fileSystem = HdfsUtils.INSTANCE.getFileSystem();
assertThat(fileSystem.exists(new Path("hdfs://localhost:" + configuration.getInt(HadoopUnitConfig.HDFS_NAMENODE_PORT_KEY) + "/khanh/test_parquet/file.parquet"))).isTrue();

context.close();
sqlContext.close();
}


@Test
public void spark_should_read_parquet_file() throws IOException {
//given
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test");

JavaSparkContext context = new JavaSparkContext(conf);
SparkSession sqlContext = SparkSession.builder().appName("test").master("local[*]").enableHiveSupport().getOrCreate();

SparkJob sparkJob = new SparkJob(context);
DataFrame sql = sparkJob.run();
SparkJob sparkJob = new SparkJob(sqlContext);
Dataset<Row> sql = sparkJob.run();
sql.write().parquet("hdfs://localhost:" + configuration.getInt(HadoopUnitConfig.HDFS_NAMENODE_PORT_KEY) + "/khanh/test_parquet/file.parquet");

FileSystem fileSystem = HdfsUtils.INSTANCE.getFileSystem();
assertThat(fileSystem.exists(new Path("hdfs://localhost:" + configuration.getInt(HadoopUnitConfig.HDFS_NAMENODE_PORT_KEY) + "/khanh/test_parquet/file.parquet"))).isTrue();

context.close();
sqlContext.close();

//when
JavaSparkContext context1 = new JavaSparkContext(conf);
sqlContext = SparkSession.builder().appName("test").master("local[*]").enableHiveSupport().getOrCreate();

SparkJob sparkJob1 = new SparkJob(context1);
DataFrame sql1 = sparkJob1.run();
SparkJob sparkJob1 = new SparkJob(sqlContext);
Dataset<Row> sql1 = sparkJob1.run();

DataFrame select = sql1.select("id", "value");
Row[] rows = select.collect();
Dataset<Row> select = sql1.select("id", "value");
Row[] rows = (Row[]) select.collect();

//then
for (int i = 1; i < 4; i++) {
Expand All @@ -192,7 +175,7 @@ public void spark_should_read_parquet_file() throws IOException {
assertThat(i).isEqualTo(row.getInt(0));
}

context.close();
sqlContext.close();


}
Expand Down
Loading

0 comments on commit f575245

Please sign in to comment.