Skip to content

Commit

Permalink
Merge pull request #282 from andyflury/feature-batchprocessor-capacity
Browse files Browse the repository at this point in the history
Initial capacity on LinkedBlockingQueue inside BatchProcessor
  • Loading branch information
majst01 committed Feb 10, 2017
2 parents a7fa63a + 1beafd3 commit ca347db
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class BatchProcessor {

private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<AbstractBatchEntry> queue = new LinkedBlockingQueue<>();
protected final BlockingQueue<AbstractBatchEntry> queue;
private final ScheduledExecutorService scheduler;
final InfluxDBImpl influxDB;
final int actions;
Expand Down Expand Up @@ -171,6 +171,11 @@ public static Builder builder(final InfluxDB influxDB) {
this.flushIntervalUnit = flushIntervalUnit;
this.flushInterval = flushInterval;
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
} else {
this.queue = new LinkedBlockingQueue<>();
}
// Flush at specified Rate
this.scheduler.scheduleAtFixedRate(new Runnable() {
@Override
Expand Down Expand Up @@ -238,7 +243,11 @@ void write() {
* the batchEntry to write to the cache.
*/
void put(final AbstractBatchEntry batchEntry) {
this.queue.add(batchEntry);
try {
this.queue.put(batchEntry);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (this.queue.size() >= this.actions) {
this.scheduler.submit(new Runnable() {
@Override
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ public void testAsyncWritePointThroughUDP() {
this.influxDB.disableBatch();
}
}


/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
*/
@Test(expected = RuntimeException.class)
public void testAsyncWritePointThroughUDPFail() {
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS);
try{
Assert.assertTrue(this.influxDB.isBatchEnabled());
String measurement = TestUtils.getRandomMeasurement();
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
Thread.currentThread().interrupt();
this.influxDB.write(UDP_PORT, point);
}finally{
this.influxDB.disableBatch();
}
}

/**
* Test writing to the database using string protocol.
Expand Down

0 comments on commit ca347db

Please sign in to comment.