diff --git a/CHANGELOG.md b/CHANGELOG.md index bcdff3971..107ac5b20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ [Issue #451](https://github.com/influxdata/influxdb-java/issues/451) - @Column supports class inheritance [Issue #367](https://github.com/influxdata/influxdb-java/issues/367) +- BatchOptions to have .precision() + [Issue #532](https://github.com/influxdata/influxdb-java/issues/532) ## 2.14 [2018-10-12] diff --git a/src/main/java/org/influxdb/BatchOptions.java b/src/main/java/org/influxdb/BatchOptions.java index 90c1add32..8722d663b 100644 --- a/src/main/java/org/influxdb/BatchOptions.java +++ b/src/main/java/org/influxdb/BatchOptions.java @@ -4,6 +4,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; /** @@ -12,6 +13,13 @@ */ public final class BatchOptions implements Cloneable { + // default values here are consistent with Telegraf + public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000; + public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000; + public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0; + public static final int DEFAULT_BUFFER_LIMIT = 10000; + public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS; + /** * Default batch options. This class is immutable, each configuration * is built by taking the DEFAULTS and setting specific configuration @@ -19,16 +27,11 @@ public final class BatchOptions implements Cloneable { */ public static final BatchOptions DEFAULTS = new BatchOptions(); - // default values here are consistent with Telegraf - public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000; - public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000; - public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0; - public static final int DEFAULT_BUFFER_LIMIT = 10000; - private int actions = DEFAULT_BATCH_ACTIONS_LIMIT; private int flushDuration = DEFAULT_BATCH_INTERVAL_DURATION; private int jitterDuration = DEFAULT_JITTER_INTERVAL_DURATION; private int bufferLimit = DEFAULT_BUFFER_LIMIT; + private TimeUnit precision = DEFAULT_PRECISION; private ThreadFactory threadFactory = Executors.defaultThreadFactory(); BiConsumer, Throwable> exceptionHandler = (points, throwable) -> { @@ -119,6 +122,17 @@ public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) { return clone; } + /** + * Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS}. + * @param precision sets the precision to use + * @return the BatchOptions instance to be able to use it in a fluent manner. + */ + public BatchOptions precision(final TimeUnit precision) { + BatchOptions clone = getClone(); + clone.precision = precision; + return clone; + } + /** * @return actions the number of actions to collect */ @@ -168,6 +182,14 @@ public BiConsumer, Throwable> getExceptionHandler() { public InfluxDB.ConsistencyLevel getConsistency() { return consistency; } + + /** + * @return the time precision + */ + public TimeUnit getPrecision() { + return precision; + } + private BatchOptions getClone() { try { diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index cc2e7c478..53cc40dcc 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -41,6 +41,7 @@ public final class BatchProcessor { private final int flushInterval; private final ConsistencyLevel consistencyLevel; private final int jitterInterval; + private final TimeUnit precision; private final BatchWriter batchWriter; /** @@ -56,6 +57,7 @@ public static final class Builder { // this is a default value if the InfluxDb.enableBatch(BatchOptions) IS NOT used // the reason is backward compatibility private int bufferLimit = 0; + private TimeUnit precision; private BiConsumer, Throwable> exceptionHandler = (entries, throwable) -> { }; private ConsistencyLevel consistencyLevel; @@ -149,18 +151,32 @@ public Builder exceptionHandler(final BiConsumer, Throwable> han this.exceptionHandler = handler; return this; } - /** - * Consistency level for batch write. - * - * @param consistencyLevel - * the consistencyLevel - * - * @return this Builder to use it fluent - */ - public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; - return this; - } + + /** + * Consistency level for batch write. + * + * @param consistencyLevel + * the consistencyLevel + * + * @return this Builder to use it fluent + */ + public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + return this; + } + + /** + * Set the time precision to use for the batch. + * + * @param precision + * the precision + * + * @return this Builder to use it fluent + */ + public Builder precision(final TimeUnit precision) { + this.precision = precision; + return this; + } /** * Create the BatchProcessor. @@ -183,7 +199,8 @@ public BatchProcessor build() { batchWriter = new OneShotBatchWriter(this.influxDB); } return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit, - this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel); + this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel, + this.precision); } } @@ -245,7 +262,7 @@ public static Builder builder(final InfluxDB influxDB) { BatchProcessor(final InfluxDBImpl influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory, final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval, final BiConsumer, Throwable> exceptionHandler, - final ConsistencyLevel consistencyLevel) { + final ConsistencyLevel consistencyLevel, TimeUnit precision) { super(); this.influxDB = influxDB; this.batchWriter = batchWriter; @@ -256,6 +273,7 @@ public static Builder builder(final InfluxDB influxDB) { this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); this.exceptionHandler = exceptionHandler; this.consistencyLevel = consistencyLevel; + this.precision = precision; if (actions > 1 && actions < Integer.MAX_VALUE) { this.queue = new LinkedBlockingQueue<>(actions); } else { @@ -303,7 +321,8 @@ void write() { String batchKey = dbName + "_" + rp; if (!batchKeyToBatchPoints.containsKey(batchKey)) { BatchPoints batchPoints = BatchPoints.database(dbName) - .retentionPolicy(rp).consistency(getConsistencyLevel()).build(); + .retentionPolicy(rp).consistency(getConsistencyLevel()) + .precision(getPrecision()).build(); batchKeyToBatchPoints.put(batchKey, batchPoints); } batchKeyToBatchPoints.get(batchKey).point(point); @@ -376,6 +395,10 @@ public ConsistencyLevel getConsistencyLevel() { return consistencyLevel; } + public TimeUnit getPrecision() { + return precision; + } + BatchWriter getBatchWriter() { return batchWriter; } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 5ae1a7a04..306c5adc5 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -304,6 +304,7 @@ public InfluxDB enableBatch(final BatchOptions batchOptions) { .threadFactory(batchOptions.getThreadFactory()) .bufferLimit(batchOptions.getBufferLimit()) .consistencyLevel(batchOptions.getConsistency()) + .precision(batchOptions.getPrecision()) .build(); this.batchEnabled.set(true); return this; diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 8a17245f0..75e8f5af7 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -1,29 +1,39 @@ package org.influxdb.impl; -import static org.mockito.Mockito.any; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.hamcrest.MockitoHamcrest.argThat; -import org.hamcrest.Matchers; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import org.hamcrest.Matchers; +import org.influxdb.BatchOptions; import org.influxdb.InfluxDB; +import org.influxdb.TestUtils; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; - -import static org.junit.Assert.assertNull; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.assertThat; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; @RunWith(JUnitPlatform.class) @@ -159,4 +169,60 @@ public void testConsistencyLevelUpdated() throws InterruptedException, IOExcepti assertThat(batchProcessor.getConsistencyLevel(), is(equalTo(InfluxDB.ConsistencyLevel.ANY))); } + @Test + @SuppressWarnings("unchecked") + public void precision() throws Exception { + String dbName = "write_unittest_" + System.currentTimeMillis(); + String rpName = "somePolicy"; + BatchWriter batchWriter; + try (InfluxDB influxDB = TestUtils.connectToInfluxDB()) { + try { + influxDB.createDatabase(dbName); + influxDB.createRetentionPolicy(rpName, dbName, "30h", 2, true); + + influxDB.enableBatch(BatchOptions.DEFAULTS.actions(2000).precision(TimeUnit.SECONDS).flushDuration(100)); + + BatchProcessor batchProcessor = getPrivateField(influxDB, "batchProcessor"); + BatchWriter originalBatchWriter = getPrivateField(batchProcessor, "batchWriter"); + batchWriter = Mockito.spy(originalBatchWriter); + setPrivateField(batchProcessor, "batchWriter", batchWriter); + + Point point1 = Point.measurement("cpu") + .time(System.currentTimeMillis() /1000, TimeUnit.SECONDS) + .addField("idle", 90L) + .addField("user", 9L) + .addField("system", 1L) + .build(); + + influxDB.write(dbName, rpName, point1); + + } finally { + influxDB.deleteDatabase(dbName); + } + } + + ArgumentCaptor> argument = ArgumentCaptor.forClass(Collection.class); + + verify(batchWriter, times(2)).write(argument.capture()); + + for (Collection list : argument.getAllValues()) { + for (BatchPoints p : list) { + assertTrue(p.toString().contains("precision=SECONDS")); + assertFalse(p.toString().contains("precision=NANOSECONDS")); + } + } + } + + @SuppressWarnings("unchecked") + static T getPrivateField(final Object obj, final String name) throws Exception { + Field field = obj.getClass().getDeclaredField(name); + field.setAccessible(true); + return (T) field.get(obj); + } + + static void setPrivateField(final Object obj, final String name, final Object value) throws Exception { + Field field = obj.getClass().getDeclaredField(name); + field.setAccessible(true); + field.set(obj, value); + } }