Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add elasticsearch #5

Merged
merged 1 commit into from
May 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ Values can be:
* HBASE
* MONGODB
* CASSANDRA
* ELASTICSEARCH

Here is a sample integration test:
```java
Expand Down Expand Up @@ -432,6 +433,7 @@ Values can be:
* HBASE
* MONGODB
* CASSANDRA
* ELASTICSEARCH

hadoopUnitPath is not mandatory but system enviroment variable HADOOP_UNIT_HOME must be defined.

Expand Down Expand Up @@ -498,6 +500,7 @@ public class HdfsBootstrapIntegrationTest {
* HBase
* MongoDB
* Cassandra 3.4
* ElasticSearch 5.0-alpha2

Built on:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum Component {
SOLRCLOUD("solrcloud"),
SOLR("solr"),
CASSANDRA("cassandra"),
MONGODB("mongodb");
MONGODB("mongodb"),
ELASTICSEARCH("elastic");

private String key;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public class Config {
public static final String CASSANDRA_PORT_KEY = "cassandra.port";
public static final String CASSANDRA_TEMP_DIR_KEY = "cassandra.temp.dir";

// ElasticSearch
public static final String ELASTICSEARCH_IP_KEY = "elasticsearch.ip";
public static final String ELASTICSEARCH_HTTP_PORT_KEY = "elasticsearch.http.port";
public static final String ELASTICSEARCH_TCP_PORT_KEY = "elasticsearch.tcp.port";
public static final String ELASTICSEARCH_TEMP_DIR_KEY = "elasticsearch.temp.dir";
public static final String ELASTICSEARCH_INDEX_NAME = "elasticsearch.index.name";
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";

// Kafka
public static final String KAFKA_HOSTNAME_KEY = "kafka.hostname";
public static final String KAFKA_PORT_KEY = "kafka.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,6 @@ public class HadoopUtils {
private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
private static Configuration configuration;

// public static void setHadoopHome() {
//
// // Set hadoop.home.dir to point to the windows lib dir
// if (System.getProperty("os.name").startsWith("Windows")) {
//
// String windowsLibDir = getHadoopHome();
//
// LOG.info("WINDOWS: Setting hadoop.home.dir: {}", windowsLibDir);
// System.setProperty("hadoop.home.dir", windowsLibDir);
// System.load(new File(windowsLibDir + Path.SEPARATOR + "lib" + Path.SEPARATOR + "hadoop.dll").getAbsolutePath());
// System.load(new File(windowsLibDir + Path.SEPARATOR + "lib" + Path.SEPARATOR + "hdfs.dll").getAbsolutePath());
//
// }
// }


public static void setHadoopHome() {

// Set hadoop.home.dir to point to the windows lib dir
Expand Down
28 changes: 28 additions & 0 deletions hadoop-unit-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-unit</artifactId>
<groupId>fr.jetoile.hadoop</groupId>
<version>1.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hadoop-unit-elasticsearch</artifactId>
<name>hadoop-unit-elasticsearch</name>

<dependencies>
<dependency>
<groupId>fr.jetoile.hadoop</groupId>
<artifactId>hadoop-unit-commons</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.jetoile.hadoopunit.component;

import fr.jetoile.hadoopunit.Component;
import fr.jetoile.hadoopunit.Config;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Paths;

public class ElasticSearchBootstrap implements Bootstrap {
final public static String NAME = Component.ELASTICSEARCH.name();

final private Logger LOGGER = LoggerFactory.getLogger(ElasticSearchBootstrap.class);

private State state = State.STOPPED;

private Configuration configuration;

private String ip;
private int httpPort;
private int tcpPort;
private Client client;
private Node node;
private String tmpDir;
private String indexName;
private String clusterName;

public ElasticSearchBootstrap() {
try {
loadConfig();
} catch (BootstrapException e) {
LOGGER.error("unable to load configuration", e);
}
}

@Override
public String getName() {
return NAME;
}

@Override
public String getProperties() {
return "[" +
"clusterName:" + clusterName +
", ip:" + ip +
", httpPort:" + httpPort +
", tcpPort:" + tcpPort +
", indexName:" + indexName +
", tmpDir:" + tmpDir +
"]";
}

private void loadConfig() throws BootstrapException {
try {
configuration = new PropertiesConfiguration(Config.DEFAULT_PROPS_FILE);
} catch (ConfigurationException e) {
throw new BootstrapException("bad config", e);
}

httpPort = configuration.getInt(Config.ELASTICSEARCH_HTTP_PORT_KEY);
tcpPort = configuration.getInt(Config.ELASTICSEARCH_TCP_PORT_KEY);
ip = configuration.getString(Config.ELASTICSEARCH_IP_KEY);
tmpDir = configuration.getString(Config.ELASTICSEARCH_TEMP_DIR_KEY);
indexName = configuration.getString(Config.ELASTICSEARCH_INDEX_NAME);
clusterName = configuration.getString(Config.ELASTICSEARCH_CLUSTER_NAME);
}

private void build() {
client = createEmbeddedNode().client();
}

private Node createEmbeddedNode() {
// Instancie le client pour inserer
Settings settings = Settings.builder()
.put("cluster.name", clusterName)
.put("path.data", tmpDir + "/data")
.put("path.logs", tmpDir + "/logs")
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.put("network.host", ip)
.put("path.home", tmpDir)
.build();

node = new Node(settings);
node.start();
LOGGER.debug("an embedded node is started");

return node;
}

@Override
public Bootstrap start() {
if (state == State.STOPPED) {
state = State.STARTING;
LOGGER.info("{} is starting", this.getClass().getName());
try {
build();
client.admin().indices().prepareCreate(indexName).execute().actionGet();

client.admin().cluster().prepareHealth().setWaitForYellowStatus().get();
} catch (Exception e) {
LOGGER.error("unable to add elastic", e);
}
state = State.STARTED;
LOGGER.info("{} is started", this.getClass().getName());
}

return this;
}

@Override
public Bootstrap stop() {
if (state == State.STARTED) {
state = State.STOPPING;
LOGGER.info("{} is stopping", this.getClass().getName());
try {
client.close();
node.close();
cleanup();
} catch (Exception e) {
LOGGER.error("unable to stop elastic", e);
}
state = State.STOPPED;
LOGGER.info("{} is stopped", this.getClass().getName());
}
return this;
}

@Override
public org.apache.hadoop.conf.Configuration getConfiguration() {
throw new UnsupportedOperationException("the method getConfiguration can not be called on ElasticSearchBootstrap");
}

private void cleanup() {
try {
FileUtils.deleteDirectory(Paths.get(tmpDir).toFile());
} catch (IOException e) {
LOGGER.error("unable to delete {}", tmpDir, e);
}
}

public Client getClient() {
return client;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fr.jetoile.hadoopunit.component.ElasticSearchBootstrap
Loading