diff --git a/hadoop-unit-confluent/src/test/java/fr/jetoile/hadoopunit/component/ConfluentBootstrapTest.java b/hadoop-unit-confluent/src/test/java/fr/jetoile/hadoopunit/component/ConfluentBootstrapTest.java index 5f131d00..490f91cf 100644 --- a/hadoop-unit-confluent/src/test/java/fr/jetoile/hadoopunit/component/ConfluentBootstrapTest.java +++ b/hadoop-unit-confluent/src/test/java/fr/jetoile/hadoopunit/component/ConfluentBootstrapTest.java @@ -14,14 +14,11 @@ package fr.jetoile.hadoopunit.component; -import fr.jetoile.hadoopunit.Component; import fr.jetoile.hadoopunit.HadoopBootstrap; import fr.jetoile.hadoopunit.HadoopUnitConfig; import fr.jetoile.hadoopunit.exception.BootstrapException; -import fr.jetoile.hadoopunit.exception.NotFoundServiceException; import io.confluent.ksql.rest.client.KsqlRestClient; import io.confluent.ksql.rest.client.RestResponse; -import io.confluent.ksql.rest.entity.ExecutionPlan; import io.confluent.ksql.rest.entity.KsqlEntityList; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -43,10 +40,8 @@ public class ConfluentBootstrapTest { private static final Logger LOGGER = LoggerFactory.getLogger(ConfluentBootstrapTest.class); - static private Configuration configuration; - @BeforeClass public static void setup() throws BootstrapException { HadoopBootstrap.INSTANCE.startAll(); @@ -58,101 +53,103 @@ public static void setup() throws BootstrapException { } } - @AfterClass - public static void tearDown() throws BootstrapException { + public static void tearDown() { HadoopBootstrap.INSTANCE.stopAll(); } @Test public void schemaRegistry_should_be_ok() { Client client = ClientBuilder.newClient(); - String response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-key/versions") + + String schemaRegistryUrlPrefix = "http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY); + + String response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-key/versions") .request("application/vnd.schemaregistry.v1+json") .post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"id\":1}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions") .request("application/vnd.schemaregistry.v1+json") .post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"id\":1}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects") + response = client.target(schemaRegistryUrlPrefix + "/subjects") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("[\"Kafka-value\",\"Kafka-key\"]"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/schemas/ids/1") + response = client.target(schemaRegistryUrlPrefix + "/schemas/ids/1") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("{\"schema\":\"\\\"string\\\"\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("[1]"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions/1") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions/1") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("{\"subject\":\"Kafka-value\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions/1") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions/1") .request("application/vnd.schemaregistry.v1+json") .delete(String.class); assertThat(response).isEqualToIgnoringCase("1"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions") .request("application/vnd.schemaregistry.v1+json") .post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"id\":1}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions/latest") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions/latest") .request("application/vnd.schemaregistry.v1+json") .delete(String.class); assertThat(response).isEqualToIgnoringCase("2"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions") .request("application/vnd.schemaregistry.v1+json") .post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"id\":1}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/schemas/ids/1") + response = client.target(schemaRegistryUrlPrefix + "/schemas/ids/1") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("{\"schema\":\"\\\"string\\\"\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-key") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-key") .request("application/vnd.schemaregistry.v1+json") .post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"subject\":\"Kafka-key\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/compatibility/subjects/Kafka-value/versions/latest") + response = client.target(schemaRegistryUrlPrefix + "/compatibility/subjects/Kafka-value/versions/latest") .request("application/vnd.schemaregistry.v1+json") .post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"is_compatible\":true}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/config") + response = client.target(schemaRegistryUrlPrefix + "/config") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("{\"compatibilityLevel\":\"BACKWARD\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/config") + response = client.target(schemaRegistryUrlPrefix + "/config") .request("application/vnd.schemaregistry.v1+json") .put(Entity.entity("{\"compatibility\": \"NONE\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"compatibility\":\"NONE\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/config/Kafka-value") + response = client.target(schemaRegistryUrlPrefix + "/config/Kafka-value") .request("application/vnd.schemaregistry.v1+json") .put(Entity.entity("{\"compatibility\": \"BACKWARD\"}", "application/vnd.schemaregistry.v1+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"compatibility\":\"BACKWARD\"}"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value") + response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value") .request("application/vnd.schemaregistry.v1+json") .delete(String.class); assertThat(response).isEqualToIgnoringCase("[3]"); - response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects") + response = client.target(schemaRegistryUrlPrefix + "/subjects") .request("application/vnd.schemaregistry.v1+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("[\"Kafka-key\"]"); @@ -162,31 +159,34 @@ public void schemaRegistry_should_be_ok() { public void kafkaRest_should_be_ok() { Client client = ClientBuilder.newClient(); - String response = client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/topics/jsontest") + + String restUrlPrefix = "http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY); + + String response = client.target(restUrlPrefix + "/topics/jsontest") .request("application/vnd.kafka.v2+json") .accept("application/vnd.kafka.v2+json") .post(Entity.entity("{\"records\":[{\"value\":{\"foo\":\"bar\"}}]}", "application/vnd.kafka.json.v2+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"offsets\":[{\"partition\":0,\"offset\":0,\"error_code\":null,\"error\":null}],\"key_schema_id\":null,\"value_schema_id\":null}"); - response = client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer") + response = client.target(restUrlPrefix + "/consumers/my_json_consumer") .request("application/vnd.kafka.v2+json") .post(Entity.entity("{\"name\": \"my_consumer_instance\", \"format\": \"json\", \"auto.offset.reset\": \"earliest\"}", "application/vnd.kafka.json.v2+json"), String.class); assertThat(response).isEqualToIgnoringCase("{\"instance_id\":\"my_consumer_instance\",\"base_uri\":\"http://127.0.0.1:8082/consumers/my_json_consumer/instances/my_consumer_instance\"}"); - client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance/subscription") + client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance/subscription") .request("application/vnd.kafka.v2+json") .post(Entity.entity("{\"topics\":[\"jsontest\"]}", "application/vnd.kafka.json.v2+json"), String.class); - client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance/records") + client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance/records") .request("application/vnd.kafka.json.v2+json") .get(String.class); - response = client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance/records") + response = client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance/records") .request("application/vnd.kafka.json.v2+json") .get(String.class); assertThat(response).isEqualToIgnoringCase("[{\"topic\":\"jsontest\",\"key\":null,\"value\":{\"foo\":\"bar\"},\"partition\":0,\"offset\":0}]"); - client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance") + client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance") .request("application/vnd.kafka.v2+json") .delete(String.class); } @@ -213,7 +213,6 @@ public void kafkaKsql_should_be_ok() { // Assert.assertTrue(ksqlEntityList.get(0) instanceof ExecutionPlan); - results = ksqlRestClient.makeKsqlRequest("DESCRIBE pageviews_original;"); Assert.assertNotNull(results); Assert.assertTrue(results.isSuccessful()); diff --git a/hadoop-unit-elasticsearch/src/test/java/fr/jetoile/hadoopunit/component/ElasticSearchBootstrapTest.java b/hadoop-unit-elasticsearch/src/test/java/fr/jetoile/hadoopunit/component/ElasticSearchBootstrapTest.java index 28044ff3..fa1140f4 100644 --- a/hadoop-unit-elasticsearch/src/test/java/fr/jetoile/hadoopunit/component/ElasticSearchBootstrapTest.java +++ b/hadoop-unit-elasticsearch/src/test/java/fr/jetoile/hadoopunit/component/ElasticSearchBootstrapTest.java @@ -15,9 +15,8 @@ package fr.jetoile.hadoopunit.component; import com.fasterxml.jackson.databind.ObjectMapper; -import fr.jetoile.hadoopunit.Component; -import fr.jetoile.hadoopunit.HadoopUnitConfig; import fr.jetoile.hadoopunit.HadoopBootstrap; +import fr.jetoile.hadoopunit.HadoopUnitConfig; import fr.jetoile.hadoopunit.exception.BootstrapException; import fr.jetoile.hadoopunit.exception.NotFoundServiceException; import org.apache.commons.configuration.Configuration; @@ -141,7 +140,7 @@ public boolean equals(Object obj) { if (this.value != sample.value && this.value != null && !this.value.equals(sample.value)) return false; - if (this.size != sample.size ) return false; + if (this.size != sample.size) return false; if (this.price != sample.price) return false; return true; diff --git a/hadoop-unit-redis/src/main/java/fr/jetoile/hadoopunit/component/RedisBootstrap.java b/hadoop-unit-redis/src/main/java/fr/jetoile/hadoopunit/component/RedisBootstrap.java index 20bd0dc0..f14684e6 100644 --- a/hadoop-unit-redis/src/main/java/fr/jetoile/hadoopunit/component/RedisBootstrap.java +++ b/hadoop-unit-redis/src/main/java/fr/jetoile/hadoopunit/component/RedisBootstrap.java @@ -28,8 +28,6 @@ import net.ishiis.redis.unit.config.RedisSentinelConfig; import net.ishiis.redis.unit.config.RedisServerConfig; import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -103,11 +101,11 @@ public String getName() { @Override public String getProperties() { return - "\n \t\t\t masterPort:" + masterPort + - "\n \t\t\t version:" + version + - "\n \t\t\t type:" + type.name() + - (slavePorts.size() != 0 && type!=RedisType.SERVER ? "\n \t\t\t slavePorts: " + slavePorts : "") + - (sentinelPorts.size() != 0 && type == RedisType.SENTINEL ? "\n \t\t\t sentinelPorts: " + sentinelPorts : ""); + "\n \t\t\t masterPort:" + masterPort + + "\n \t\t\t version:" + version + + "\n \t\t\t type:" + type.name() + + (slavePorts.size() != 0 && type != RedisType.SERVER ? "\n \t\t\t slavePorts: " + slavePorts : "") + + (sentinelPorts.size() != 0 && type == RedisType.SENTINEL ? "\n \t\t\t sentinelPorts: " + sentinelPorts : ""); } private void loadConfig() throws BootstrapException {