Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameter binding in InfluxQL (influxdata/influxdb-java#274) #429

Merged
merged 11 commits into from
Mar 21, 2018
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 2.10 [unreleased]

### Features

- Support for parameter binding in queries ("prepared statements") [PR #429](https://github.com/influxdata/influxdb-java/pull/429)

## 2.9 [2018-02-27]

### Features
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ this.influxDB.query(new Query("SELECT idle FROM cpu", dbName), queryResult -> {
});
```

#### Query using parameter binding ("prepared statements", version 2.10+ required)

If your Query is based on user input, it is good practice to use parameter binding to avoid [injection attacks](https://en.wikipedia.org/wiki/SQL_injection).
You can create queries with parameter binding with the help of the QueryBuilder:

```java
Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE idle > $idle AND system > $system")
.forDatabase(dbName)
.bind("idle", 90)
.bind("system", 5)
.create();
QueryResult results = influxDB.query(query);
```

The values of the bind() calls are bound to the placeholders in the query ($idle, $system).

#### Batch flush interval jittering (version 2.9+ required)

When using large number of influxdb-java clients against a single server it may happen that all the clients
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/org/influxdb/dto/BoundParameterQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.influxdb.dto;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.influxdb.InfluxDBIOException;

import okio.Buffer;

public final class BoundParameterQuery extends Query {

private final Map<String, Object> params = new HashMap<>();

private BoundParameterQuery(final String command, final String database) {
super(command, database, true);
}

public String getParameterJsonWithUrlEncoded() {
try {
String jsonParameterObject = createJsonObject(params);
String urlEncodedJsonParameterObject = encode(jsonParameterObject);
return urlEncodedJsonParameterObject;
} catch (IOException e) {
throw new InfluxDBIOException(e);
}
}

private String createJsonObject(final Map<String, Object> parameterMap) throws IOException {
Buffer b = new Buffer();
JsonWriter writer = JsonWriter.of(b);
writer.beginObject();
for (Entry<String, Object> pair : parameterMap.entrySet()) {
String name = pair.getKey();
Object value = pair.getValue();
if (value instanceof Number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The moshi library has a specific behavior when deserializing numbers: #153 (comment)

I hope casting to Number is enough to keep the data type on the JSON object being created here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at the moshi code and tested it. This works as intended.

Number number = (Number) value;
writer.name(name).value(number);
} else if (value instanceof String) {
writer.name(name).value((String) value);
} else if (value instanceof Boolean) {
writer.name(name).value((Boolean) value);
} else {
writer.name(name).value(String.valueOf(value));
}
}
writer.endObject();
return b.readString(Charset.forName("utf-8"));
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + params.hashCode();
return result;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
BoundParameterQuery other = (BoundParameterQuery) obj;
if (!params.equals(other.params)) {
return false;
}
return true;
}

public static class QueryBuilder {
private BoundParameterQuery query;
private String influxQL;

public static QueryBuilder newQuery(final String influxQL) {
QueryBuilder instance = new QueryBuilder();
instance.influxQL = influxQL;
return instance;
}

public QueryBuilder forDatabase(final String database) {
query = new BoundParameterQuery(influxQL, database);
return this;
}

public QueryBuilder bind(final String placeholder, final Object value) {
query.params.put(placeholder, value);
return this;
}

public BoundParameterQuery create() {
return query;
}
}
}
43 changes: 34 additions & 9 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.influxdb.InfluxDBException;
import org.influxdb.InfluxDBIOException;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
Expand Down Expand Up @@ -454,8 +455,16 @@ public void query(final Query query, final int chunkSize, final Consumer<QueryRe
throw new UnsupportedOperationException("chunking not supported");
}

Call<ResponseBody> call = this.influxDBService.query(this.username, this.password,
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
Call<ResponseBody> call = null;
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.query(this.username, this.password,
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize,
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username, this.password,
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
}

call.enqueue(new Callback<ResponseBody>() {
@Override
Expand Down Expand Up @@ -496,8 +505,17 @@ public void onFailure(final Call<ResponseBody> call, final Throwable t) {
*/
@Override
public QueryResult query(final Query query, final TimeUnit timeUnit) {
return execute(this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded()));
Call<QueryResult> call = null;
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(),
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
}
return execute(call);
}

/**
Expand Down Expand Up @@ -560,12 +578,19 @@ public boolean databaseExists(final String name) {
*/
private Call<QueryResult> callQuery(final Query query) {
Call<QueryResult> call;
if (query.requiresPost()) {
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded(),
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
if (query.requiresPost()) {
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
}
}
return call;
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface InfluxDBService {
public static final String Q = "q";
public static final String DB = "db";
public static final String RP = "rp";
public static final String PARAMS = "params";
public static final String PRECISION = "precision";
public static final String CONSISTENCY = "consistency";
public static final String EPOCH = "epoch";
Expand Down Expand Up @@ -47,6 +48,11 @@ public Call<ResponseBody> writePoints(@Query(U) String username,
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query);

@POST("/query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query,
@Query(value = PARAMS, encoded = true) String params);

@GET("/query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query);
Expand All @@ -55,6 +61,10 @@ public Call<QueryResult> query(@Query(U) String username, @Query(P) String passw
public Call<QueryResult> postQuery(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query);

@POST("/query")
public Call<QueryResult> postQuery(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query, @Query(value = PARAMS, encoded = true) String params);

@GET("/query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password,
@Query(value = Q, encoded = true) String query);
Expand All @@ -68,4 +78,10 @@ public Call<QueryResult> postQuery(@Query(U) String username,
public Call<ResponseBody> query(@Query(U) String username,
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("/query?chunked=true")
public Call<ResponseBody> query(@Query(U) String username,
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
}
45 changes: 45 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Series;
import org.influxdb.impl.InfluxDBImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -89,6 +91,49 @@ public void testQuery() {
this.influxDB.query(new Query("DROP DATABASE mydb2", "mydb"));
}

@Test
public void testBoundParameterQuery() throws InterruptedException {
// set up
Point point = Point
.measurement("cpu")
.tag("atag", "test")
.addField("idle", 90L)
.addField("usertime", 9L)
.addField("system", 1L)
.build();
this.influxDB.setDatabase(UDP_DATABASE);
this.influxDB.write(point);

// test
Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag")
.forDatabase(UDP_DATABASE)
.bind("atag", "test")
.create();
QueryResult result = this.influxDB.query(query);
Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1);
Series series = result.getResults().get(0).getSeries().get(0);
Assertions.assertTrue(series.getValues().size() == 1);

result = this.influxDB.query(query, TimeUnit.SECONDS);
Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1);
series = result.getResults().get(0).getSeries().get(0);
Assertions.assertTrue(series.getValues().size() == 1);

Object waitForTestresults = new Object();
Consumer<QueryResult> check = (queryResult) -> {
Assertions.assertTrue(queryResult.getResults().get(0).getSeries().size() == 1);
Series s = queryResult.getResults().get(0).getSeries().get(0);
Assertions.assertTrue(s.getValues().size() == 1);
synchronized (waitForTestresults) {
waitForTestresults.notifyAll();
}
};
this.influxDB.query(query, 10, check);
synchronized (waitForTestresults) {
waitForTestresults.wait(2000);
}
}

/**
* Tests for callback query.
*/
Expand Down
Loading