Skip to content

Commit

Permalink
BatchOptions to have .precision()
Browse files Browse the repository at this point in the history
  • Loading branch information
asashour committed Feb 1, 2019
1 parent dffbea9 commit 3caa96d
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
[Issue #451](https://github.com/influxdata/influxdb-java/issues/451)
- @Column supports class inheritance
[Issue #367](https://github.com/influxdata/influxdb-java/issues/367)
- BatchOptions to have .precision()
[Issue #532](https://github.com/influxdata/influxdb-java/issues/532)

## 2.14 [2018-10-12]

Expand Down
34 changes: 28 additions & 6 deletions src/main/java/org/influxdb/BatchOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
Expand All @@ -12,23 +13,25 @@
*/
public final class BatchOptions implements Cloneable {

// default values here are consistent with Telegraf
public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000;
public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000;
public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
public static final int DEFAULT_BUFFER_LIMIT = 10000;
public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;

/**
* Default batch options. This class is immutable, each configuration
* is built by taking the DEFAULTS and setting specific configuration
* properties.
*/
public static final BatchOptions DEFAULTS = new BatchOptions();

// default values here are consistent with Telegraf
public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000;
public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000;
public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
public static final int DEFAULT_BUFFER_LIMIT = 10000;

private int actions = DEFAULT_BATCH_ACTIONS_LIMIT;
private int flushDuration = DEFAULT_BATCH_INTERVAL_DURATION;
private int jitterDuration = DEFAULT_JITTER_INTERVAL_DURATION;
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private TimeUnit precision = DEFAULT_PRECISION;

private ThreadFactory threadFactory = Executors.defaultThreadFactory();
BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> {
Expand Down Expand Up @@ -119,6 +122,17 @@ public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) {
return clone;
}

/**
* Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS}.
* @param precision sets the precision to use
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
public BatchOptions precision(final TimeUnit precision) {
BatchOptions clone = getClone();
clone.precision = precision;
return clone;
}

/**
* @return actions the number of actions to collect
*/
Expand Down Expand Up @@ -168,6 +182,14 @@ public BiConsumer<Iterable<Point>, Throwable> getExceptionHandler() {
public InfluxDB.ConsistencyLevel getConsistency() {
return consistency;
}

/**
* @return the time precision
*/
public TimeUnit getPrecision() {
return precision;
}


private BatchOptions getClone() {
try {
Expand Down
53 changes: 38 additions & 15 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class BatchProcessor {
private final int flushInterval;
private final ConsistencyLevel consistencyLevel;
private final int jitterInterval;
private final TimeUnit precision;
private final BatchWriter batchWriter;

/**
Expand All @@ -56,6 +57,7 @@ public static final class Builder {
// this is a default value if the InfluxDb.enableBatch(BatchOptions) IS NOT used
// the reason is backward compatibility
private int bufferLimit = 0;
private TimeUnit precision;

private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
private ConsistencyLevel consistencyLevel;
Expand Down Expand Up @@ -149,18 +151,32 @@ public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> han
this.exceptionHandler = handler;
return this;
}
/**
* Consistency level for batch write.
*
* @param consistencyLevel
* the consistencyLevel
*
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Consistency level for batch write.
*
* @param consistencyLevel
* the consistencyLevel
*
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Set the time precision to use for the batch.
*
* @param precision
* the precision
*
* @return this Builder to use it fluent
*/
public Builder precision(final TimeUnit precision) {
this.precision = precision;
return this;
}

/**
* Create the BatchProcessor.
Expand All @@ -183,7 +199,8 @@ public BatchProcessor build() {
batchWriter = new OneShotBatchWriter(this.influxDB);
}
return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel);
this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel,
this.precision);
}
}

Expand Down Expand Up @@ -245,7 +262,7 @@ public static Builder builder(final InfluxDB influxDB) {
BatchProcessor(final InfluxDBImpl influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory,
final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval,
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
final ConsistencyLevel consistencyLevel) {
final ConsistencyLevel consistencyLevel, TimeUnit precision) {
super();
this.influxDB = influxDB;
this.batchWriter = batchWriter;
Expand All @@ -256,6 +273,7 @@ public static Builder builder(final InfluxDB influxDB) {
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
this.exceptionHandler = exceptionHandler;
this.consistencyLevel = consistencyLevel;
this.precision = precision;
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
} else {
Expand Down Expand Up @@ -303,7 +321,8 @@ void write() {
String batchKey = dbName + "_" + rp;
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(rp).consistency(getConsistencyLevel()).build();
.retentionPolicy(rp).consistency(getConsistencyLevel())
.precision(getPrecision()).build();
batchKeyToBatchPoints.put(batchKey, batchPoints);
}
batchKeyToBatchPoints.get(batchKey).point(point);
Expand Down Expand Up @@ -376,6 +395,10 @@ public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
}

public TimeUnit getPrecision() {
return precision;
}

BatchWriter getBatchWriter() {
return batchWriter;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public InfluxDB enableBatch(final BatchOptions batchOptions) {
.threadFactory(batchOptions.getThreadFactory())
.bufferLimit(batchOptions.getBufferLimit())
.consistencyLevel(batchOptions.getConsistency())
.precision(batchOptions.getPrecision())
.build();
this.batchEnabled.set(true);
return this;
Expand Down
78 changes: 72 additions & 6 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
package org.influxdb.impl;

import static org.mockito.Mockito.any;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import org.hamcrest.Matchers;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.TestUtils;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import static org.junit.Assert.assertNull;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;


@RunWith(JUnitPlatform.class)
Expand Down Expand Up @@ -159,4 +169,60 @@ public void testConsistencyLevelUpdated() throws InterruptedException, IOExcepti
assertThat(batchProcessor.getConsistencyLevel(), is(equalTo(InfluxDB.ConsistencyLevel.ANY)));
}

@Test
@SuppressWarnings("unchecked")
public void precision() throws Exception {
String dbName = "write_unittest_" + System.currentTimeMillis();
String rpName = "somePolicy";
BatchWriter batchWriter;
try (InfluxDB influxDB = TestUtils.connectToInfluxDB()) {
try {
influxDB.createDatabase(dbName);
influxDB.createRetentionPolicy(rpName, dbName, "30h", 2, true);

influxDB.enableBatch(BatchOptions.DEFAULTS.actions(2000).precision(TimeUnit.SECONDS).flushDuration(100));

BatchProcessor batchProcessor = getPrivateField(influxDB, "batchProcessor");
BatchWriter originalBatchWriter = getPrivateField(batchProcessor, "batchWriter");
batchWriter = Mockito.spy(originalBatchWriter);
setPrivateField(batchProcessor, "batchWriter", batchWriter);

Point point1 = Point.measurement("cpu")
.time(System.currentTimeMillis() /1000, TimeUnit.SECONDS)
.addField("idle", 90L)
.addField("user", 9L)
.addField("system", 1L)
.build();

influxDB.write(dbName, rpName, point1);

} finally {
influxDB.deleteDatabase(dbName);
}
}

ArgumentCaptor<Collection<BatchPoints>> argument = ArgumentCaptor.forClass(Collection.class);

verify(batchWriter, times(2)).write(argument.capture());

for (Collection<BatchPoints> list : argument.getAllValues()) {
for (BatchPoints p : list) {
assertTrue(p.toString().contains("precision=SECONDS"));
assertFalse(p.toString().contains("precision=NANOSECONDS"));
}
}
}

@SuppressWarnings("unchecked")
static <T> T getPrivateField(final Object obj, final String name) throws Exception {
Field field = obj.getClass().getDeclaredField(name);
field.setAccessible(true);
return (T) field.get(obj);
}

static void setPrivateField(final Object obj, final String name, final Object value) throws Exception {
Field field = obj.getClass().getDeclaredField(name);
field.setAccessible(true);
field.set(obj, value);
}
}

0 comments on commit 3caa96d

Please sign in to comment.