Skip to content

Commit

Permalink
Merge pull request #603 from maxemann96/Issue602
Browse files Browse the repository at this point in the history
Closes #602
  • Loading branch information
majst01 committed Jun 7, 2019
2 parents 3169621 + 69d5fbd commit cc469d4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 24 deletions.
28 changes: 23 additions & 5 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.influxdb.impl;


import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import okhttp3.Headers;
Expand All @@ -12,7 +11,6 @@
import okhttp3.logging.HttpLoggingInterceptor;
import okhttp3.logging.HttpLoggingInterceptor.Level;
import okio.BufferedSource;

import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException;
Expand All @@ -27,7 +25,6 @@
import org.influxdb.impl.BatchProcessor.UdpBatchEntry;
import org.influxdb.msgpack.MessagePackConverterFactory;
import org.influxdb.msgpack.MessagePackTraverser;

import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Converter.Factory;
Expand All @@ -48,8 +45,8 @@
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -561,7 +558,28 @@ public void query(final Query query, final Consumer<QueryResult> onSuccess, fina
call.enqueue(new Callback<QueryResult>() {
@Override
public void onResponse(final Call<QueryResult> call, final Response<QueryResult> response) {
onSuccess.accept(response.body());
if (response.isSuccessful()) {
onSuccess.accept(response.body());
} else {
Throwable t = null;
String errorBody = null;

try {
if (response.errorBody() != null) {
errorBody = response.errorBody().string();
}
} catch (IOException e) {
t = e;
}

if (t != null) {
onFailure.accept(new InfluxDBException(response.message(), t));
} else if (errorBody != null) {
onFailure.accept(new InfluxDBException(response.message() + " - " + errorBody));
} else {
onFailure.accept(new InfluxDBException(response.message()));
}
}
}

@Override
Expand Down
61 changes: 42 additions & 19 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.influxdb;

import java.util.Collections;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.InfluxDB.ResponseFormat;
import org.influxdb.dto.BatchPoints;
Expand All @@ -19,15 +19,14 @@
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import okhttp3.OkHttpClient;

import java.io.IOException;
import java.net.ConnectException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand All @@ -40,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.regex.Pattern;

/**
* Test the InfluxDB API.
Expand Down Expand Up @@ -175,24 +175,47 @@ public void testBoundParameterQuery() throws InterruptedException {
}
}

/**
* Tests for callback query.
*/
@Test
public void testCallbackQuery() throws Throwable {
final AsyncResult<QueryResult> result = new AsyncResult<>();
final Consumer<QueryResult> firstQueryConsumer = new Consumer<QueryResult>() {
@Override
public void accept(QueryResult queryResult) {
influxDB.query(new Query("DROP DATABASE mydb2", "mydb"), result.resultConsumer, result.errorConsumer);
}
};
/**
* Tests for callback query.
*/
@Test
public void testCallbackQuery() throws Throwable {
final AsyncResult<QueryResult> result = new AsyncResult<>();
final Consumer<QueryResult> firstQueryConsumer = new Consumer<QueryResult>() {
@Override
public void accept(QueryResult queryResult) {
influxDB.query(new Query("DROP DATABASE mydb2", "mydb"), result.resultConsumer, result.errorConsumer);
}
};

this.influxDB.query(new Query("CREATE DATABASE mydb2", "mydb"), firstQueryConsumer, result.errorConsumer);
this.influxDB.query(new Query("CREATE DATABASE mydb2", "mydb"), firstQueryConsumer, result.errorConsumer);

// Will throw exception in case of error.
result.result();
}
// Will throw exception in case of error.
result.result();
}

/**
* Tests for callback query with a failure.
* see Issue #602
*/
@Test
public void testCallbackQueryFailureHandling() throws Throwable {
final AsyncResult<QueryResult> res = new AsyncResult<>();

this.influxDB.query(new Query("SHOW SERRIES"), res.resultConsumer, res.errorConsumer);

try{
res.result();
Assertions.fail("Malformed query should throw InfluxDBException");
}
catch (InfluxDBException e){
Pattern errorPattern = Pattern.compile("Bad Request.*error parsing query: found SERRIES, expected.*",
Pattern.DOTALL);

Assertions.assertTrue(errorPattern.matcher(e.getMessage()).matches(),
"Error string \"" + e.getMessage() + "\" does not match error pattern");
}
}

/**
* Test that describe Databases works.
Expand Down

0 comments on commit cc469d4

Please sign in to comment.