Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
jetoile committed May 20, 2018
1 parent 4e1864c commit eca5775
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@

package fr.jetoile.hadoopunit.component;

import fr.jetoile.hadoopunit.Component;
import fr.jetoile.hadoopunit.HadoopBootstrap;
import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.exception.NotFoundServiceException;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.ExecutionPlan;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
Expand All @@ -43,10 +40,8 @@
public class ConfluentBootstrapTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfluentBootstrapTest.class);

static private Configuration configuration;


@BeforeClass
public static void setup() throws BootstrapException {
HadoopBootstrap.INSTANCE.startAll();
Expand All @@ -58,101 +53,103 @@ public static void setup() throws BootstrapException {
}
}


@AfterClass
public static void tearDown() throws BootstrapException {
public static void tearDown() {
HadoopBootstrap.INSTANCE.stopAll();
}

@Test
public void schemaRegistry_should_be_ok() {
Client client = ClientBuilder.newClient();
String response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-key/versions")

String schemaRegistryUrlPrefix = "http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY);

String response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-key/versions")
.request("application/vnd.schemaregistry.v1+json")
.post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"id\":1}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions")
.request("application/vnd.schemaregistry.v1+json")
.post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"id\":1}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects")
response = client.target(schemaRegistryUrlPrefix + "/subjects")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("[\"Kafka-value\",\"Kafka-key\"]");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/schemas/ids/1")
response = client.target(schemaRegistryUrlPrefix + "/schemas/ids/1")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("{\"schema\":\"\\\"string\\\"\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("[1]");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions/1")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions/1")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("{\"subject\":\"Kafka-value\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions/1")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions/1")
.request("application/vnd.schemaregistry.v1+json")
.delete(String.class);
assertThat(response).isEqualToIgnoringCase("1");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions")
.request("application/vnd.schemaregistry.v1+json")
.post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"id\":1}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions/latest")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions/latest")
.request("application/vnd.schemaregistry.v1+json")
.delete(String.class);
assertThat(response).isEqualToIgnoringCase("2");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value/versions")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value/versions")
.request("application/vnd.schemaregistry.v1+json")
.post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"id\":1}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/schemas/ids/1")
response = client.target(schemaRegistryUrlPrefix + "/schemas/ids/1")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("{\"schema\":\"\\\"string\\\"\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-key")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-key")
.request("application/vnd.schemaregistry.v1+json")
.post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"subject\":\"Kafka-key\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/compatibility/subjects/Kafka-value/versions/latest")
response = client.target(schemaRegistryUrlPrefix + "/compatibility/subjects/Kafka-value/versions/latest")
.request("application/vnd.schemaregistry.v1+json")
.post(Entity.entity("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"is_compatible\":true}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/config")
response = client.target(schemaRegistryUrlPrefix + "/config")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("{\"compatibilityLevel\":\"BACKWARD\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/config")
response = client.target(schemaRegistryUrlPrefix + "/config")
.request("application/vnd.schemaregistry.v1+json")
.put(Entity.entity("{\"compatibility\": \"NONE\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"compatibility\":\"NONE\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/config/Kafka-value")
response = client.target(schemaRegistryUrlPrefix + "/config/Kafka-value")
.request("application/vnd.schemaregistry.v1+json")
.put(Entity.entity("{\"compatibility\": \"BACKWARD\"}", "application/vnd.schemaregistry.v1+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"compatibility\":\"BACKWARD\"}");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects/Kafka-value")
response = client.target(schemaRegistryUrlPrefix + "/subjects/Kafka-value")
.request("application/vnd.schemaregistry.v1+json")
.delete(String.class);
assertThat(response).isEqualToIgnoringCase("[3]");

response = client.target("http://" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_HOST_KEY) + ":" + configuration.getString(CONFLUENT_SCHEMAREGISTRY_PORT_KEY) + "/subjects")
response = client.target(schemaRegistryUrlPrefix + "/subjects")
.request("application/vnd.schemaregistry.v1+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("[\"Kafka-key\"]");
Expand All @@ -162,31 +159,34 @@ public void schemaRegistry_should_be_ok() {
public void kafkaRest_should_be_ok() {

Client client = ClientBuilder.newClient();
String response = client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/topics/jsontest")

String restUrlPrefix = "http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY);

String response = client.target(restUrlPrefix + "/topics/jsontest")
.request("application/vnd.kafka.v2+json")
.accept("application/vnd.kafka.v2+json")
.post(Entity.entity("{\"records\":[{\"value\":{\"foo\":\"bar\"}}]}", "application/vnd.kafka.json.v2+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"offsets\":[{\"partition\":0,\"offset\":0,\"error_code\":null,\"error\":null}],\"key_schema_id\":null,\"value_schema_id\":null}");

response = client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer")
response = client.target(restUrlPrefix + "/consumers/my_json_consumer")
.request("application/vnd.kafka.v2+json")
.post(Entity.entity("{\"name\": \"my_consumer_instance\", \"format\": \"json\", \"auto.offset.reset\": \"earliest\"}", "application/vnd.kafka.json.v2+json"), String.class);
assertThat(response).isEqualToIgnoringCase("{\"instance_id\":\"my_consumer_instance\",\"base_uri\":\"http://127.0.0.1:8082/consumers/my_json_consumer/instances/my_consumer_instance\"}");

client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance/subscription")
client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance/subscription")
.request("application/vnd.kafka.v2+json")
.post(Entity.entity("{\"topics\":[\"jsontest\"]}", "application/vnd.kafka.json.v2+json"), String.class);

client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance/records")
client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance/records")
.request("application/vnd.kafka.json.v2+json")
.get(String.class);

response = client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance/records")
response = client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance/records")
.request("application/vnd.kafka.json.v2+json")
.get(String.class);
assertThat(response).isEqualToIgnoringCase("[{\"topic\":\"jsontest\",\"key\":null,\"value\":{\"foo\":\"bar\"},\"partition\":0,\"offset\":0}]");

client.target("http://" + configuration.getString(CONFLUENT_REST_HOST_KEY) + ":" + configuration.getString(CONFLUENT_REST_PORT_KEY) + "/consumers/my_json_consumer/instances/my_consumer_instance")
client.target(restUrlPrefix + "/consumers/my_json_consumer/instances/my_consumer_instance")
.request("application/vnd.kafka.v2+json")
.delete(String.class);
}
Expand All @@ -213,7 +213,6 @@ public void kafkaKsql_should_be_ok() {
// Assert.assertTrue(ksqlEntityList.get(0) instanceof ExecutionPlan);



results = ksqlRestClient.makeKsqlRequest("DESCRIBE pageviews_original;");
Assert.assertNotNull(results);
Assert.assertTrue(results.isSuccessful());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
package fr.jetoile.hadoopunit.component;

import com.fasterxml.jackson.databind.ObjectMapper;
import fr.jetoile.hadoopunit.Component;
import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.HadoopBootstrap;
import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.exception.NotFoundServiceException;
import org.apache.commons.configuration.Configuration;
Expand Down Expand Up @@ -141,7 +140,7 @@ public boolean equals(Object obj) {

if (this.value != sample.value && this.value != null && !this.value.equals(sample.value)) return false;

if (this.size != sample.size ) return false;
if (this.size != sample.size) return false;
if (this.price != sample.price) return false;

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import net.ishiis.redis.unit.config.RedisSentinelConfig;
import net.ishiis.redis.unit.config.RedisServerConfig;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -103,11 +101,11 @@ public String getName() {
@Override
public String getProperties() {
return
"\n \t\t\t masterPort:" + masterPort +
"\n \t\t\t version:" + version +
"\n \t\t\t type:" + type.name() +
(slavePorts.size() != 0 && type!=RedisType.SERVER ? "\n \t\t\t slavePorts: " + slavePorts : "") +
(sentinelPorts.size() != 0 && type == RedisType.SENTINEL ? "\n \t\t\t sentinelPorts: " + sentinelPorts : "");
"\n \t\t\t masterPort:" + masterPort +
"\n \t\t\t version:" + version +
"\n \t\t\t type:" + type.name() +
(slavePorts.size() != 0 && type != RedisType.SERVER ? "\n \t\t\t slavePorts: " + slavePorts : "") +
(sentinelPorts.size() != 0 && type == RedisType.SENTINEL ? "\n \t\t\t sentinelPorts: " + sentinelPorts : "");
}

private void loadConfig() throws BootstrapException {
Expand Down

0 comments on commit eca5775

Please sign in to comment.