Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data retention enhancement #108

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
558e127
Removed unnecessary 'this.' references
andrewdodd Oct 21, 2015
fcb7fd8
Changing attributes to full words
andrewdodd Oct 21, 2015
b93d0e3
Refactoring batched & unmatched writes
andrewdodd Oct 21, 2015
c7a49a3
Interface Change!
andrewdodd Oct 21, 2015
77a16a7
Update InfluxDBImpl.java
andrewdodd Oct 21, 2015
e8bd99b
Data Retention Options Issue - #107
andrewdodd Oct 21, 2015
2db394c
Data Retention Enhancement
Oct 21, 2015
7f8b497
Data Retention Enhancement
Oct 21, 2015
3c1e430
Introduce slf4j to allow logging
andrewdodd Oct 22, 2015
f3967d8
Merge branch 'master' of https://github.com/andrewdodd/influxdb-java
andrewdodd Oct 22, 2015
eef0c3a
Merge branch 'master' into DataRetentionEnhancement
andrewdodd Oct 22, 2015
aae92ce
index on DataRetentionEnhancement: eef0c3a Merge branch 'master' into…
andrewdodd Oct 22, 2015
02610e9
WIP on DataRetentionEnhancement: eef0c3a Merge branch 'master' into D…
andrewdodd Oct 22, 2015
75f8b83
Write of buffered points occur in worker thread
andrewdodd Oct 22, 2015
24a4ce5
Adding backoff to flush interval based retry
andrewdodd Oct 22, 2015
e890d98
Merge branch 'master' of https://github.com/influxdb/influxdb-java in…
andrewdodd Jan 17, 2016
1e5cdd6
Merge branch 'master' into DataRetentionEnhancement
andrewdodd Mar 3, 2016
5b4fe9e
Merge branch 'master' into DataRetentionEnhancement
andrewdodd Mar 25, 2016
9e2c5e9
Using correct default (should be `null` not `0`)
andrewdodd Apr 20, 2016
a60c0e5
Improving the names of the tests
andrewdodd Apr 20, 2016
788a99e
Merge branch 'master' into DataRetentionEnhancement
andrewdodd Apr 21, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,10 @@
<artifactId>okhttp</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
</dependencies>
</project>
137 changes: 122 additions & 15 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import com.google.common.base.Optional;

/**
* Interface with all available methods to access a InfluxDB database.
*
Expand Down Expand Up @@ -66,6 +68,21 @@ public String value() {
return this.value;
}
}

/**
* Behaviour options for when a put fails to add to the buffer. This is
* particularly important when using capacity limited buffering.
*/
public enum BufferFailBehaviour {
/** Throw an exception if cannot add to buffer */
THROW_EXCEPTION,
/** Drop (do not add) the element attempting to be added */
DROP_CURRENT,
/** Drop the oldest element in the queue and add the current element */
DROP_OLDEST,
/** Block the thread until the space becomes available (NB: Not tested) */
BLOCK_THREAD,
}

/**
* Set the loglevel which is used for REST related actions.
Expand All @@ -75,20 +92,83 @@ public String value() {
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB setLogLevel(final LogLevel logLevel);

/**
* Enable Batching of single Point writes to speed up writes significant. If either actions or
* flushDurations is reached first, a batchwrite is issued.
*
* @param actions
* the number of actions to collect
* @param flushDuration
* the time to wait at most.
* the minimun time to wait at most. NB: The 'maximum' time is set to 5 time this number.
* @param flushDurationTimeUnit
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
@Deprecated
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit);


/**
* Enable Batching of single Point writes to speed up writes significant. If either actions or
* flushDurations is reached first, a batchwrite is issued.
*
* @param actions
* the number of actions to collect
* @param flushIntervalMin
* the minimum time to wait before sending the batched writes.
* @param flushIntervalMax
* The maximum time, when backing off for failure, between sending batched writes.
* @param flushDurationTimeUnit
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB enableBatch(final int flushActions,
final int flushIntervalMin,
final int flushIntervalMax,
final TimeUnit flushIntervalTimeUnit);

/**
* Enable Batching of single Point with a capacity limit. Batching provides
* a significant performance improvement in write speed. If either actions
* or flushDurations is reached first, a batchwrite is issued.
*
* This allows greater control over the behaviour when the capacity of the
* underlying buffer is limited.
*
* @param capacity
* the maximum number of points to hold. Should be NULL, for no
* buffering OR > 0 for buffering (NB: a capacity of 1 will not
* really buffer)
* @param flushActions
* the number of actions to collect before triggering a batched
* write
* @param flushIntervalMin
* the amount of time to wait before triggering a batched write
* @param flushIntervalMax
* the maximum amount of time to wait before triggering a batched write,
* when backing off due to failure
* @param flushIntervalTimeUnit
* the time unit for the flushIntervalMin and flushIntervalMax parameters
* @param behaviour
* the desired behaviour when capacity constrains are met
* @param discardOnFailedWrite
* if FALSE, the points from a failed batch write buffer will
* attempt to put them back onto the queue if TRUE, the points
* froma failed batch write will be discarded
* @param maxBatchWriteSize
* the maximum number of points to include in one batch write
* attempt. NB: this is different from the flushActions parameter, as
* the buffer can hold more than the flushActions parameter
* @return
*/
public InfluxDB enableBatch(final Integer capacity,
final int flushActions,
final int flushIntervalMin,
final int flushIntervalMax,
final TimeUnit flushIntervalTimeUnit,
BufferFailBehaviour behaviour,
boolean discardOnFailedWrite,
int maxBatchWriteSize);

/**
* Disable Batching.
*/
Expand All @@ -114,7 +194,7 @@ public String value() {
public String version();

/**
* Write a single Point to the database.
* Write a single Point to the database with ConsistencyLevel.One.
*
* @param database
* the database to write to.
Expand All @@ -123,36 +203,47 @@ public String value() {
* @param point
* The point to write
*/
@Deprecated
public void write(final String database, final String retentionPolicy, final Point point);

/**
* Write a single Point to the database.
*
* @param database
* @param retentionPolicy
* @param consistencyLevel
* @param point
*/
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final Point point);

/**
* Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol.
*
* {@linkplain "https://github.com/influxdb/influxdb/pull/2696"}
*
*
* @param batchPoints
*/
@Deprecated
public void write(final BatchPoints batchPoints);

/**
* Write a set of Points to the influxdb database with the string records.
*
* Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol.
*
* {@linkplain "https://github.com/influxdb/influxdb/pull/2696"}
*
* @param records
*
* @param batchPoints
*/
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final String records);
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List<Point> points);


/**
* Write a set of Points to the influxdb database with the list of string records.
* Write a set of Points to the influxdb database with the string records.
*
* {@linkplain "https://github.com/influxdb/influxdb/pull/2696"}
*
* @param records
*/
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List<String> records);

/**
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final String records);

/**
* Execute a query agains a database.
Expand Down Expand Up @@ -196,4 +287,20 @@ public String value() {
*/
public List<String> describeDatabases();

}
/**
* Get the number of buffered points NB: If batching is not enabled this
* will return 0
*
* @return
*/
public int getBufferedCount();

/**
* Retrieves, but does not remove, the first element of the buffer
*
* @return an Optional<Point> containing the first element in the queue if
* it is present
*/
public Optional<Point> peekFirstBuffered();

}
1 change: 1 addition & 0 deletions src/main/java/org/influxdb/dto/BatchPoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @author stefan
*
*/
@Deprecated
public class BatchPoints {
private String database;
private String retentionPolicy;
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/org/influxdb/dto/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.NumberFormat;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -62,7 +63,7 @@ public static final class Builder {
/**
* @param measurement
*/
Builder(final String measurement) {
protected Builder(final String measurement) {
this.measurement = measurement;
}

Expand Down Expand Up @@ -215,6 +216,10 @@ public Point build() {
void setMeasurement(final String measurement) {
this.measurement = measurement;
}

public String getMeasurement() {
return measurement;
}

/**
* @param time
Expand All @@ -223,6 +228,10 @@ void setMeasurement(final String measurement) {
void setTime(final Long time) {
this.time = time;
}

public Long getTime() {
return time;
}

/**
* @param tags
Expand All @@ -246,6 +255,10 @@ Map<String, String> getTags() {
void setPrecision(final TimeUnit precision) {
this.precision = precision;
}

public TimeUnit getPrecision() {
return precision;
}

/**
* @param fields
Expand Down Expand Up @@ -292,7 +305,7 @@ public String lineProtocol() {
sb.append(formatedTime());
return sb.toString();
}

private StringBuilder concatenatedTags() {
final StringBuilder sb = new StringBuilder();
for (Entry<String, String> tag : this.tags.entrySet()) {
Expand Down Expand Up @@ -351,4 +364,13 @@ private StringBuilder formatedTime() {
return sb;
}

public static String toLineProtocol(List<Point> points) {
StringBuilder sb = new StringBuilder();
for (Point point : points) {
sb.append(point.lineProtocol()).append("\n");
}
return sb.toString();
}


}
Loading