Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support generic POJO super classes #980

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions MANUAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,25 @@ influxDB.write(dbName, rpName, point);

An alternative way to create InfluxDB queries is available. By using the [QueryBuilder](QUERY_BUILDER.md) you can create queries using java instead of providing the influxdb queries as strings.

#### Generic POJO super classes

POJO classes can have generic super classes, for cases where multiple measurements have a similar structure, and differ by type(s), as in:

```java
public class SuperMeasurement<T> {
@Column
@TimeColumn
private Instant time;
@Column
T value;
// Other common columns and tags
}

public class SubMeasurement extends SuperMeasurement<String> {
// Any specific columns and tags
}
```

### InfluxDBMapper

In case you want to save and load data using models you can use the [InfluxDBMapper](INFLUXDB_MAPPER.md).
Expand Down
28 changes: 22 additions & 6 deletions src/main/java/org/influxdb/dto/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import org.influxdb.annotation.Measurement;
import org.influxdb.annotation.TimeColumn;
import org.influxdb.impl.Preconditions;
import org.influxdb.impl.TypeMapper;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
Expand Down Expand Up @@ -286,6 +289,8 @@ public Builder addFieldsFromPOJO(final Object pojo) {

while (clazz != null) {

TypeMapper typeMapper = TypeMapper.empty();
while (clazz != null) {
for (Field field : clazz.getDeclaredFields()) {

Column column = field.getAnnotation(Column.class);
Expand All @@ -304,10 +309,20 @@ public Builder addFieldsFromPOJO(final Object pojo) {
fieldName = field.getName();
}

addFieldByAttribute(pojo, field, column != null && column.tag(), fieldName);
addFieldByAttribute(pojo, field, column != null && column.tag(), fieldName, typeMapper);
}

Class<?> superclass = clazz.getSuperclass();
Type genericSuperclass = clazz.getGenericSuperclass();
if (genericSuperclass instanceof ParameterizedType) {
typeMapper = TypeMapper.of((ParameterizedType) genericSuperclass, superclass);
} else {
typeMapper = TypeMapper.empty();
}
clazz = clazz.getSuperclass();

clazz = superclass;
}
}

if (this.fields.isEmpty()) {
throw new BuilderException("Class " + pojo.getClass().getName()
Expand All @@ -318,13 +333,14 @@ public Builder addFieldsFromPOJO(final Object pojo) {
}

private void addFieldByAttribute(final Object pojo, final Field field, final boolean tag,
final String fieldName) {
final String fieldName, final TypeMapper typeMapper) {
try {
Object fieldValue = field.get(pojo);

TimeColumn tc = field.getAnnotation(TimeColumn.class);
Class<?> fieldType = (Class<?>) typeMapper.resolve(field.getGenericType());
if (tc != null) {
if (Instant.class.isAssignableFrom(field.getType())) {
if (Instant.class.isAssignableFrom(fieldType)) {
Optional.ofNullable((Instant) fieldValue).ifPresent(instant -> {
TimeUnit timeUnit = tc.timeUnit();
if (timeUnit == TimeUnit.NANOSECONDS || timeUnit == TimeUnit.MICROSECONDS) {
Expand All @@ -341,7 +357,7 @@ private void addFieldByAttribute(final Object pojo, final Field field, final boo
}

throw new InfluxDBMapperException(
"Unsupported type " + field.getType() + " for time: should be of Instant type");
"Unsupported type " + fieldType + " for time: should be of Instant type");
}

if (tag) {
Expand All @@ -350,7 +366,7 @@ private void addFieldByAttribute(final Object pojo, final Field field, final boo
}
} else {
if (fieldValue != null) {
setField(field.getType(), fieldName, fieldValue);
setField(fieldType, fieldName, fieldValue);
}
}

Expand Down
185 changes: 83 additions & 102 deletions src/main/java/org/influxdb/impl/InfluxDBResultMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
Expand All @@ -50,8 +52,12 @@ public class InfluxDBResultMapper {
/**
* Data structure used to cache classes used as measurements.
*/
private static class ClassInfo {
ConcurrentMap<String, Field> fieldMap;
ConcurrentMap<Field, TypeMapper> typeMappers;
}
private static final
ConcurrentMap<String, ConcurrentMap<String, Field>> CLASS_FIELD_CACHE = new ConcurrentHashMap<>();
ConcurrentMap<String, ClassInfo> CLASS_INFO_CACHE = new ConcurrentHashMap<>();

private static final int FRACTION_MIN_WIDTH = 0;
private static final int FRACTION_MAX_WIDTH = 9;
Expand Down Expand Up @@ -204,21 +210,19 @@ void throwExceptionIfResultWithError(final QueryResult queryResult) {
});
}

ConcurrentMap<String, Field> getColNameAndFieldMap(final Class<?> clazz) {
return CLASS_FIELD_CACHE.get(clazz.getName());
}

void cacheMeasurementClass(final Class<?>... classVarAgrs) {
for (Class<?> clazz : classVarAgrs) {
if (CLASS_FIELD_CACHE.containsKey(clazz.getName())) {
if (CLASS_INFO_CACHE.containsKey(clazz.getName())) {
continue;
}
ConcurrentMap<String, Field> influxColumnAndFieldMap = new ConcurrentHashMap<>();
ConcurrentMap<String, Field> fieldMap = new ConcurrentHashMap<>();
ConcurrentMap<Field, TypeMapper> typeMappers = new ConcurrentHashMap<>();

Measurement measurement = clazz.getAnnotation(Measurement.class);
boolean allFields = measurement != null && measurement.allFields();

Class<?> c = clazz;
TypeMapper typeMapper = TypeMapper.empty();
while (c != null) {
for (Field field : c.getDeclaredFields()) {
Column colAnnotation = field.getAnnotation(Column.class);
Expand All @@ -227,11 +231,25 @@ void cacheMeasurementClass(final Class<?>... classVarAgrs) {
continue;
}

influxColumnAndFieldMap.put(getFieldName(field, colAnnotation), field);
fieldMap.put(getFieldName(field, colAnnotation), field);
typeMappers.put(field, typeMapper);
}
c = c.getSuperclass();

Class<?> superclass = c.getSuperclass();
Type genericSuperclass = c.getGenericSuperclass();
if (genericSuperclass instanceof ParameterizedType) {
typeMapper = TypeMapper.of((ParameterizedType) genericSuperclass, superclass);
} else {
typeMapper = TypeMapper.empty();
}

c = superclass;
}
CLASS_FIELD_CACHE.putIfAbsent(clazz.getName(), influxColumnAndFieldMap);

ClassInfo classInfo = new ClassInfo();
classInfo.fieldMap = fieldMap;
classInfo.typeMappers = typeMappers;
CLASS_INFO_CACHE.putIfAbsent(clazz.getName(), classInfo);
}
}

Expand All @@ -255,28 +273,26 @@ String getRetentionPolicy(final Class<?> clazz) {
return ((Measurement) clazz.getAnnotation(Measurement.class)).retentionPolicy();
}

TimeUnit getTimeUnit(final Class<?> clazz) {
return ((Measurement) clazz.getAnnotation(Measurement.class)).timeUnit();
}

<T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz, final List<T> result) {
return parseSeriesAs(series, clazz, result, TimeUnit.MILLISECONDS);
}

<T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz, final List<T> result,
final TimeUnit precision) {
int columnSize = series.getColumns().size();
ConcurrentMap<String, Field> colNameAndFieldMap = CLASS_FIELD_CACHE.get(clazz.getName());

ClassInfo classInfo = CLASS_INFO_CACHE.get(clazz.getName());
try {
T object = null;
for (List<Object> row : series.getValues()) {
for (int i = 0; i < columnSize; i++) {
Field correspondingField = colNameAndFieldMap.get(series.getColumns().get(i)/*InfluxDB columnName*/);
Field correspondingField = classInfo.fieldMap.get(series.getColumns().get(i)/*InfluxDB columnName*/);
if (correspondingField != null) {
if (object == null) {
object = clazz.newInstance();
}
setFieldValue(object, correspondingField, row.get(i), precision);
setFieldValue(object, correspondingField, row.get(i), precision,
classInfo.typeMappers.get(correspondingField));
}
}
// When the "GROUP BY" clause is used, "tags" are returned as Map<String,String> and
Expand All @@ -285,10 +301,11 @@ <T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz,
// "tag" values are always String.
if (series.getTags() != null && !series.getTags().isEmpty()) {
for (Entry<String, String> entry : series.getTags().entrySet()) {
Field correspondingField = colNameAndFieldMap.get(entry.getKey()/*InfluxDB columnName*/);
Field correspondingField = classInfo.fieldMap.get(entry.getKey()/*InfluxDB columnName*/);
if (correspondingField != null) {
// I don't think it is possible to reach here without a valid "object"
setFieldValue(object, correspondingField, entry.getValue(), precision);
setFieldValue(object, correspondingField, entry.getValue(), precision,
classInfo.typeMappers.get(correspondingField));
}
}
}
Expand All @@ -309,104 +326,68 @@ <T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz,
* for more information.
*
*/
private static <T> void setFieldValue(final T object, final Field field, final Object value, final TimeUnit precision)
private static <T> void setFieldValue(final T object, final Field field, final Object value, final TimeUnit precision,
final TypeMapper typeMapper)
throws IllegalArgumentException, IllegalAccessException {
if (value == null) {
return;
}
Class<?> fieldType = field.getType();
Type fieldType = typeMapper.resolve(field.getGenericType());
if (!field.isAccessible()) {
field.setAccessible(true);
}
field.set(object, adaptValue((Class<?>) fieldType, value, precision, field.getName(), object.getClass().getName()));
}

private static Object adaptValue(final Class<?> fieldType, final Object value, final TimeUnit precision,
final String fieldName, final String className) {
try {
if (!field.isAccessible()) {
field.setAccessible(true);
if (String.class.isAssignableFrom(fieldType)) {
return String.valueOf(value);
}
if (Instant.class.isAssignableFrom(fieldType)) {
if (value instanceof String) {
return Instant.from(RFC3339_FORMATTER.parse(String.valueOf(value)));
}
if (value instanceof Long) {
return Instant.ofEpochMilli(toMillis((long) value, precision));
}
if (value instanceof Double) {
return Instant.ofEpochMilli(toMillis(((Double) value).longValue(), precision));
}
if (value instanceof Integer) {
return Instant.ofEpochMilli(toMillis(((Integer) value).longValue(), precision));
}
throw new InfluxDBMapperException("Unsupported type " + fieldType + " for field " + fieldName);
}
if (fieldValueModified(fieldType, field, object, value, precision)
|| fieldValueForPrimitivesModified(fieldType, field, object, value)
|| fieldValueForPrimitiveWrappersModified(fieldType, field, object, value)) {
return;
if (Double.class.isAssignableFrom(fieldType) || double.class.isAssignableFrom(fieldType)) {
return value;
}
if (Long.class.isAssignableFrom(fieldType) || long.class.isAssignableFrom(fieldType)) {
return ((Double) value).longValue();
}
if (Integer.class.isAssignableFrom(fieldType) || int.class.isAssignableFrom(fieldType)) {
return ((Double) value).intValue();
}
if (Boolean.class.isAssignableFrom(fieldType) || boolean.class.isAssignableFrom(fieldType)) {
return Boolean.valueOf(String.valueOf(value));
}
if (Enum.class.isAssignableFrom(fieldType)) {
//noinspection unchecked
return Enum.valueOf((Class<Enum>) fieldType, String.valueOf(value));
}
String msg = "Class '%s' field '%s' is from an unsupported type '%s'.";
throw new InfluxDBMapperException(
String.format(msg, object.getClass().getName(), field.getName(), field.getType()));
} catch (ClassCastException e) {
String msg = "Class '%s' field '%s' was defined with a different field type and caused a ClassCastException. "
+ "The correct type is '%s' (current field value: '%s').";
throw new InfluxDBMapperException(
String.format(msg, object.getClass().getName(), field.getName(), value.getClass().getName(), value));
}
}

static <T> boolean fieldValueModified(final Class<?> fieldType, final Field field, final T object, final Object value,
final TimeUnit precision)
throws IllegalArgumentException, IllegalAccessException {
if (String.class.isAssignableFrom(fieldType)) {
field.set(object, String.valueOf(value));
return true;
}
if (Instant.class.isAssignableFrom(fieldType)) {
Instant instant;
if (value instanceof String) {
instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(value)));
} else if (value instanceof Long) {
instant = Instant.ofEpochMilli(toMillis((long) value, precision));
} else if (value instanceof Double) {
instant = Instant.ofEpochMilli(toMillis(((Double) value).longValue(), precision));
} else if (value instanceof Integer) {
instant = Instant.ofEpochMilli(toMillis(((Integer) value).longValue(), precision));
} else {
throw new InfluxDBMapperException("Unsupported type " + field.getClass() + " for field " + field.getName());
}
field.set(object, instant);
return true;
}
return false;
}

static <T> boolean fieldValueForPrimitivesModified(final Class<?> fieldType, final Field field, final T object,
final Object value)
throws IllegalArgumentException, IllegalAccessException {
if (double.class.isAssignableFrom(fieldType)) {
field.setDouble(object, ((Double) value).doubleValue());
return true;
}
if (long.class.isAssignableFrom(fieldType)) {
field.setLong(object, ((Double) value).longValue());
return true;
}
if (int.class.isAssignableFrom(fieldType)) {
field.setInt(object, ((Double) value).intValue());
return true;
String.format(msg, className, fieldName, value.getClass().getName(), value));
}
if (boolean.class.isAssignableFrom(fieldType)) {
field.setBoolean(object, Boolean.valueOf(String.valueOf(value)).booleanValue());
return true;
}
return false;
}

static <T> boolean fieldValueForPrimitiveWrappersModified(final Class<?> fieldType, final Field field, final T object,
final Object value)
throws IllegalArgumentException, IllegalAccessException {
if (Double.class.isAssignableFrom(fieldType)) {
field.set(object, value);
return true;
}
if (Long.class.isAssignableFrom(fieldType)) {
field.set(object, Long.valueOf(((Double) value).longValue()));
return true;
}
if (Integer.class.isAssignableFrom(fieldType)) {
field.set(object, Integer.valueOf(((Double) value).intValue()));
return true;
}
if (Boolean.class.isAssignableFrom(fieldType)) {
field.set(object, Boolean.valueOf(String.valueOf(value)));
return true;
}
return false;
throw new InfluxDBMapperException(
String.format("Class '%s' field '%s' is from an unsupported type '%s'.", className, fieldName, fieldType));
}

private static Long toMillis(final long value, final TimeUnit precision) {

private static long toMillis(final long value, final TimeUnit precision) {
return TimeUnit.MILLISECONDS.convert(value, precision);
}
}
Loading
Loading