Skip to content

Commit

Permalink
Support generic POJO super classes (#980)
Browse files Browse the repository at this point in the history
* Support generic POJO super classes

* Add generic POJO super classes to MANUAL.md

* Clean up TypeMapper
  • Loading branch information
eranl authored Dec 13, 2023
1 parent f324d22 commit 4b4dc44
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 108 deletions.
19 changes: 19 additions & 0 deletions MANUAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,25 @@ influxDB.write(dbName, rpName, point);

An alternative way to create InfluxDB queries is available. By using the [QueryBuilder](QUERY_BUILDER.md) you can create queries using java instead of providing the influxdb queries as strings.

#### Generic POJO super classes

POJO classes can have generic super classes, for cases where multiple measurements have a similar structure, and differ by type(s), as in:

```java
public class SuperMeasurement<T> {
@Column
@TimeColumn
private Instant time;
@Column
T value;
// Other common columns and tags
}

public class SubMeasurement extends SuperMeasurement<String> {
// Any specific columns and tags
}
```

### InfluxDBMapper

In case you want to save and load data using models you can use the [InfluxDBMapper](INFLUXDB_MAPPER.md).
Expand Down
28 changes: 22 additions & 6 deletions src/main/java/org/influxdb/dto/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import org.influxdb.annotation.Measurement;
import org.influxdb.annotation.TimeColumn;
import org.influxdb.impl.Preconditions;
import org.influxdb.impl.TypeMapper;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
Expand Down Expand Up @@ -286,6 +289,8 @@ public Builder addFieldsFromPOJO(final Object pojo) {

while (clazz != null) {

TypeMapper typeMapper = TypeMapper.empty();
while (clazz != null) {
for (Field field : clazz.getDeclaredFields()) {

Column column = field.getAnnotation(Column.class);
Expand All @@ -304,10 +309,20 @@ public Builder addFieldsFromPOJO(final Object pojo) {
fieldName = field.getName();
}

addFieldByAttribute(pojo, field, column != null && column.tag(), fieldName);
addFieldByAttribute(pojo, field, column != null && column.tag(), fieldName, typeMapper);
}

Class<?> superclass = clazz.getSuperclass();
Type genericSuperclass = clazz.getGenericSuperclass();
if (genericSuperclass instanceof ParameterizedType) {
typeMapper = TypeMapper.of((ParameterizedType) genericSuperclass, superclass);
} else {
typeMapper = TypeMapper.empty();
}
clazz = clazz.getSuperclass();

clazz = superclass;
}
}

if (this.fields.isEmpty()) {
throw new BuilderException("Class " + pojo.getClass().getName()
Expand All @@ -318,13 +333,14 @@ public Builder addFieldsFromPOJO(final Object pojo) {
}

private void addFieldByAttribute(final Object pojo, final Field field, final boolean tag,
final String fieldName) {
final String fieldName, final TypeMapper typeMapper) {
try {
Object fieldValue = field.get(pojo);

TimeColumn tc = field.getAnnotation(TimeColumn.class);
Class<?> fieldType = (Class<?>) typeMapper.resolve(field.getGenericType());
if (tc != null) {
if (Instant.class.isAssignableFrom(field.getType())) {
if (Instant.class.isAssignableFrom(fieldType)) {
Optional.ofNullable((Instant) fieldValue).ifPresent(instant -> {
TimeUnit timeUnit = tc.timeUnit();
if (timeUnit == TimeUnit.NANOSECONDS || timeUnit == TimeUnit.MICROSECONDS) {
Expand All @@ -341,7 +357,7 @@ private void addFieldByAttribute(final Object pojo, final Field field, final boo
}

throw new InfluxDBMapperException(
"Unsupported type " + field.getType() + " for time: should be of Instant type");
"Unsupported type " + fieldType + " for time: should be of Instant type");
}

if (tag) {
Expand All @@ -350,7 +366,7 @@ private void addFieldByAttribute(final Object pojo, final Field field, final boo
}
} else {
if (fieldValue != null) {
setField(field.getType(), fieldName, fieldValue);
setField(fieldType, fieldName, fieldValue);
}
}

Expand Down
185 changes: 83 additions & 102 deletions src/main/java/org/influxdb/impl/InfluxDBResultMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
Expand All @@ -50,8 +52,12 @@ public class InfluxDBResultMapper {
/**
* Data structure used to cache classes used as measurements.
*/
private static class ClassInfo {
ConcurrentMap<String, Field> fieldMap;
ConcurrentMap<Field, TypeMapper> typeMappers;
}
private static final
ConcurrentMap<String, ConcurrentMap<String, Field>> CLASS_FIELD_CACHE = new ConcurrentHashMap<>();
ConcurrentMap<String, ClassInfo> CLASS_INFO_CACHE = new ConcurrentHashMap<>();

private static final int FRACTION_MIN_WIDTH = 0;
private static final int FRACTION_MAX_WIDTH = 9;
Expand Down Expand Up @@ -204,21 +210,19 @@ void throwExceptionIfResultWithError(final QueryResult queryResult) {
});
}

ConcurrentMap<String, Field> getColNameAndFieldMap(final Class<?> clazz) {
return CLASS_FIELD_CACHE.get(clazz.getName());
}

void cacheMeasurementClass(final Class<?>... classVarAgrs) {
for (Class<?> clazz : classVarAgrs) {
if (CLASS_FIELD_CACHE.containsKey(clazz.getName())) {
if (CLASS_INFO_CACHE.containsKey(clazz.getName())) {
continue;
}
ConcurrentMap<String, Field> influxColumnAndFieldMap = new ConcurrentHashMap<>();
ConcurrentMap<String, Field> fieldMap = new ConcurrentHashMap<>();
ConcurrentMap<Field, TypeMapper> typeMappers = new ConcurrentHashMap<>();

Measurement measurement = clazz.getAnnotation(Measurement.class);
boolean allFields = measurement != null && measurement.allFields();

Class<?> c = clazz;
TypeMapper typeMapper = TypeMapper.empty();
while (c != null) {
for (Field field : c.getDeclaredFields()) {
Column colAnnotation = field.getAnnotation(Column.class);
Expand All @@ -227,11 +231,25 @@ void cacheMeasurementClass(final Class<?>... classVarAgrs) {
continue;
}

influxColumnAndFieldMap.put(getFieldName(field, colAnnotation), field);
fieldMap.put(getFieldName(field, colAnnotation), field);
typeMappers.put(field, typeMapper);
}
c = c.getSuperclass();

Class<?> superclass = c.getSuperclass();
Type genericSuperclass = c.getGenericSuperclass();
if (genericSuperclass instanceof ParameterizedType) {
typeMapper = TypeMapper.of((ParameterizedType) genericSuperclass, superclass);
} else {
typeMapper = TypeMapper.empty();
}

c = superclass;
}
CLASS_FIELD_CACHE.putIfAbsent(clazz.getName(), influxColumnAndFieldMap);

ClassInfo classInfo = new ClassInfo();
classInfo.fieldMap = fieldMap;
classInfo.typeMappers = typeMappers;
CLASS_INFO_CACHE.putIfAbsent(clazz.getName(), classInfo);
}
}

Expand All @@ -255,28 +273,26 @@ String getRetentionPolicy(final Class<?> clazz) {
return ((Measurement) clazz.getAnnotation(Measurement.class)).retentionPolicy();
}

TimeUnit getTimeUnit(final Class<?> clazz) {
return ((Measurement) clazz.getAnnotation(Measurement.class)).timeUnit();
}

<T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz, final List<T> result) {
return parseSeriesAs(series, clazz, result, TimeUnit.MILLISECONDS);
}

<T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz, final List<T> result,
final TimeUnit precision) {
int columnSize = series.getColumns().size();
ConcurrentMap<String, Field> colNameAndFieldMap = CLASS_FIELD_CACHE.get(clazz.getName());

ClassInfo classInfo = CLASS_INFO_CACHE.get(clazz.getName());
try {
T object = null;
for (List<Object> row : series.getValues()) {
for (int i = 0; i < columnSize; i++) {
Field correspondingField = colNameAndFieldMap.get(series.getColumns().get(i)/*InfluxDB columnName*/);
Field correspondingField = classInfo.fieldMap.get(series.getColumns().get(i)/*InfluxDB columnName*/);
if (correspondingField != null) {
if (object == null) {
object = clazz.newInstance();
}
setFieldValue(object, correspondingField, row.get(i), precision);
setFieldValue(object, correspondingField, row.get(i), precision,
classInfo.typeMappers.get(correspondingField));
}
}
// When the "GROUP BY" clause is used, "tags" are returned as Map<String,String> and
Expand All @@ -285,10 +301,11 @@ <T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz,
// "tag" values are always String.
if (series.getTags() != null && !series.getTags().isEmpty()) {
for (Entry<String, String> entry : series.getTags().entrySet()) {
Field correspondingField = colNameAndFieldMap.get(entry.getKey()/*InfluxDB columnName*/);
Field correspondingField = classInfo.fieldMap.get(entry.getKey()/*InfluxDB columnName*/);
if (correspondingField != null) {
// I don't think it is possible to reach here without a valid "object"
setFieldValue(object, correspondingField, entry.getValue(), precision);
setFieldValue(object, correspondingField, entry.getValue(), precision,
classInfo.typeMappers.get(correspondingField));
}
}
}
Expand All @@ -309,104 +326,68 @@ <T> List<T> parseSeriesAs(final QueryResult.Series series, final Class<T> clazz,
* for more information.
*
*/
private static <T> void setFieldValue(final T object, final Field field, final Object value, final TimeUnit precision)
private static <T> void setFieldValue(final T object, final Field field, final Object value, final TimeUnit precision,
final TypeMapper typeMapper)
throws IllegalArgumentException, IllegalAccessException {
if (value == null) {
return;
}
Class<?> fieldType = field.getType();
Type fieldType = typeMapper.resolve(field.getGenericType());
if (!field.isAccessible()) {
field.setAccessible(true);
}
field.set(object, adaptValue((Class<?>) fieldType, value, precision, field.getName(), object.getClass().getName()));
}

private static Object adaptValue(final Class<?> fieldType, final Object value, final TimeUnit precision,
final String fieldName, final String className) {
try {
if (!field.isAccessible()) {
field.setAccessible(true);
if (String.class.isAssignableFrom(fieldType)) {
return String.valueOf(value);
}
if (Instant.class.isAssignableFrom(fieldType)) {
if (value instanceof String) {
return Instant.from(RFC3339_FORMATTER.parse(String.valueOf(value)));
}
if (value instanceof Long) {
return Instant.ofEpochMilli(toMillis((long) value, precision));
}
if (value instanceof Double) {
return Instant.ofEpochMilli(toMillis(((Double) value).longValue(), precision));
}
if (value instanceof Integer) {
return Instant.ofEpochMilli(toMillis(((Integer) value).longValue(), precision));
}
throw new InfluxDBMapperException("Unsupported type " + fieldType + " for field " + fieldName);
}
if (fieldValueModified(fieldType, field, object, value, precision)
|| fieldValueForPrimitivesModified(fieldType, field, object, value)
|| fieldValueForPrimitiveWrappersModified(fieldType, field, object, value)) {
return;
if (Double.class.isAssignableFrom(fieldType) || double.class.isAssignableFrom(fieldType)) {
return value;
}
if (Long.class.isAssignableFrom(fieldType) || long.class.isAssignableFrom(fieldType)) {
return ((Double) value).longValue();
}
if (Integer.class.isAssignableFrom(fieldType) || int.class.isAssignableFrom(fieldType)) {
return ((Double) value).intValue();
}
if (Boolean.class.isAssignableFrom(fieldType) || boolean.class.isAssignableFrom(fieldType)) {
return Boolean.valueOf(String.valueOf(value));
}
if (Enum.class.isAssignableFrom(fieldType)) {
//noinspection unchecked
return Enum.valueOf((Class<Enum>) fieldType, String.valueOf(value));
}
String msg = "Class '%s' field '%s' is from an unsupported type '%s'.";
throw new InfluxDBMapperException(
String.format(msg, object.getClass().getName(), field.getName(), field.getType()));
} catch (ClassCastException e) {
String msg = "Class '%s' field '%s' was defined with a different field type and caused a ClassCastException. "
+ "The correct type is '%s' (current field value: '%s').";
throw new InfluxDBMapperException(
String.format(msg, object.getClass().getName(), field.getName(), value.getClass().getName(), value));
}
}

static <T> boolean fieldValueModified(final Class<?> fieldType, final Field field, final T object, final Object value,
final TimeUnit precision)
throws IllegalArgumentException, IllegalAccessException {
if (String.class.isAssignableFrom(fieldType)) {
field.set(object, String.valueOf(value));
return true;
}
if (Instant.class.isAssignableFrom(fieldType)) {
Instant instant;
if (value instanceof String) {
instant = Instant.from(RFC3339_FORMATTER.parse(String.valueOf(value)));
} else if (value instanceof Long) {
instant = Instant.ofEpochMilli(toMillis((long) value, precision));
} else if (value instanceof Double) {
instant = Instant.ofEpochMilli(toMillis(((Double) value).longValue(), precision));
} else if (value instanceof Integer) {
instant = Instant.ofEpochMilli(toMillis(((Integer) value).longValue(), precision));
} else {
throw new InfluxDBMapperException("Unsupported type " + field.getClass() + " for field " + field.getName());
}
field.set(object, instant);
return true;
}
return false;
}

static <T> boolean fieldValueForPrimitivesModified(final Class<?> fieldType, final Field field, final T object,
final Object value)
throws IllegalArgumentException, IllegalAccessException {
if (double.class.isAssignableFrom(fieldType)) {
field.setDouble(object, ((Double) value).doubleValue());
return true;
}
if (long.class.isAssignableFrom(fieldType)) {
field.setLong(object, ((Double) value).longValue());
return true;
}
if (int.class.isAssignableFrom(fieldType)) {
field.setInt(object, ((Double) value).intValue());
return true;
String.format(msg, className, fieldName, value.getClass().getName(), value));
}
if (boolean.class.isAssignableFrom(fieldType)) {
field.setBoolean(object, Boolean.valueOf(String.valueOf(value)).booleanValue());
return true;
}
return false;
}

static <T> boolean fieldValueForPrimitiveWrappersModified(final Class<?> fieldType, final Field field, final T object,
final Object value)
throws IllegalArgumentException, IllegalAccessException {
if (Double.class.isAssignableFrom(fieldType)) {
field.set(object, value);
return true;
}
if (Long.class.isAssignableFrom(fieldType)) {
field.set(object, Long.valueOf(((Double) value).longValue()));
return true;
}
if (Integer.class.isAssignableFrom(fieldType)) {
field.set(object, Integer.valueOf(((Double) value).intValue()));
return true;
}
if (Boolean.class.isAssignableFrom(fieldType)) {
field.set(object, Boolean.valueOf(String.valueOf(value)));
return true;
}
return false;
throw new InfluxDBMapperException(
String.format("Class '%s' field '%s' is from an unsupported type '%s'.", className, fieldName, fieldType));
}

private static Long toMillis(final long value, final TimeUnit precision) {

private static long toMillis(final long value, final TimeUnit precision) {
return TimeUnit.MILLISECONDS.convert(value, precision);
}
}
Loading

0 comments on commit 4b4dc44

Please sign in to comment.