Skip to content

Commit

Permalink
Fixed exception propagation in query streaming influxdata#639
Browse files Browse the repository at this point in the history
  • Loading branch information
rhajek committed Nov 22, 2019
1 parent 5cdc9e3 commit f33dc3f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 24 deletions.
6 changes: 6 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,13 @@ public boolean isCanceled() {
if (onFailure != null) {
onFailure.accept(e);
}
} catch (Exception e) {
call.cancel();
if (onFailure != null) {
onFailure.accept(e);
}
}

}

@Override
Expand Down
94 changes: 70 additions & 24 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,31 +1023,77 @@ public void accept(QueryResult result) {
Assertions.assertEquals("DONE", result.getError());
}

/**
* Test chunking edge case.
* @throws InterruptedException
*/
@Test
public void testChunkingFail() throws InterruptedException {
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
// do not test version 0.13 and 1.0
return;
}
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
final CountDownLatch countDownLatch = new CountDownLatch(1);
Query query = new Query("UNKNOWN_QUERY", dbName);
this.influxDB.query(query, 10, new Consumer<QueryResult>() {
@Override
public void accept(QueryResult result) {
countDownLatch.countDown();
}
});
this.influxDB.query(new Query("DROP DATABASE " + dbName));
Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS));
}
/**
* Test chunking edge case.
*
* @throws InterruptedException
*/
@Test
public void testChunkingFail() throws InterruptedException {
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
// do not test version 0.13 and 1.0
return;
}
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatchFailure = new CountDownLatch(1);
Query query = new Query("UNKNOWN_QUERY", dbName);
this.influxDB.query(query, 10,
(cancellable, queryResult) -> {
countDownLatch.countDown();
}, () -> {
},
throwable -> {
countDownLatchFailure.countDown();
});
this.influxDB.query(new Query("DROP DATABASE " + dbName));
Assertions.assertTrue(countDownLatchFailure.await(10, TimeUnit.SECONDS));
Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS));
}

/**
@Test
public void testChunkingFailInConsumer() throws InterruptedException {
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
// do not test version 0.13 and 1.0
return;
}
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.query(new Query("CREATE DATABASE " + dbName));

String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build();
Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build();
Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build();
Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build();
batchPoints.point(point1);
batchPoints.point(point2);
batchPoints.point(point3);
this.influxDB.write(batchPoints);

final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatchFailure = new CountDownLatch(1);
final CountDownLatch countDownLatchComplete = new CountDownLatch(1);
Query query = new Query("SELECT * FROM disk", dbName);
this.influxDB.query(query, 2,
(cancellable, queryResult) -> {
countDownLatch.countDown();
throw new RuntimeException("my error");
}, () -> {
countDownLatchComplete.countDown();
System.out.println("onComplete()");
},
throwable -> {
Assertions.assertEquals(throwable.getMessage(), "my error");
countDownLatchFailure.countDown();
});
this.influxDB.query(new Query("DROP DATABASE " + dbName));
Assertions.assertTrue(countDownLatchFailure.await(10, TimeUnit.SECONDS));
Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
Assertions.assertFalse(countDownLatchComplete.await(10, TimeUnit.SECONDS));
}

/**
* Test chunking on 0.13 and 1.0.
* @throws InterruptedException
*/
Expand Down

0 comments on commit f33dc3f

Please sign in to comment.