Skip to content

Commit

Permalink
feat(topicdata): added fix for incorrect datatype and missing fields …
Browse files Browse the repository at this point in the history
…in the json while producing to topic (#1233)
  • Loading branch information
GnanaJeyam committed Nov 6, 2022
1 parent 2028cc9 commit ec4acf0
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
37 changes: 37 additions & 0 deletions src/main/java/org/akhq/controllers/ErrorController.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URISyntaxException;
import java.util.regex.Pattern;

@Secured(SecurityRule.IS_ANONYMOUS)
@Slf4j
Expand All @@ -33,6 +34,11 @@ public HttpResponse<?> error(HttpRequest<?> request, ApiException e) {
return renderExecption(request, e);
}

@Error(global = true)
public HttpResponse<?> error(HttpRequest<?> request, ClassCastException e) {
return extractAndRenderException(request, e);
}

// Registry
@Error(global = true)
public HttpResponse<?> error(HttpRequest<?> request, RestClientException e) {
Expand Down Expand Up @@ -109,11 +115,42 @@ public HttpResponse<?> notFound(HttpRequest<?> request) throws URISyntaxExceptio
return HttpResponse.<JsonError>notFound()
.body(error);
}

@Error(global = true)
public HttpResponse<?> error(HttpRequest<?> request, InvalidClusterException e) {
JsonError error = new JsonError(e.getMessage())
.link(Link.SELF, Link.of(request.getUri()));

return HttpResponse.status(HttpStatus.CONFLICT).body(error);
}

private HttpResponse<?> extractAndRenderException(HttpRequest<?> request, Exception e) {
String fieldRegex = "field\\s.*$";
String expectedTypeRegex = "cannot be cast to\\sclass\\s[A-z.]+";
String actualTypeRegex = "class\\s[A-z.]+";

String actualField = retrievePatternMatch(fieldRegex, e.getMessage());
String expectedType = retrievePatternMatch(expectedTypeRegex, e.getMessage()).toLowerCase();
String actualType = retrievePatternMatch(actualTypeRegex, e.getMessage()).toLowerCase();

var message = String.format("Field %s required %s but got %s", actualField, expectedType, actualType);
JsonError error = new JsonError(message)
.link(Link.SELF, Link.of(request.getUri()));

return HttpResponse.<JsonError>status(HttpStatus.CONFLICT)
.body(error);
}

private String retrievePatternMatch(String regex, String message) {
var compile = Pattern.compile(regex);
var matcher = compile.matcher(message);
if (matcher.find()) {
var group = matcher.group();
var invalidWordRegex = "field\\s|cannot\\sbe\\scast\\sto\\sclass\\s|class\\s|java.lang.";

return group.replaceAll(invalidWordRegex, "");
}

return "";
}
}
17 changes: 12 additions & 5 deletions src/main/java/org/akhq/utils/AvroSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.commons.collections.CollectionUtils;

import java.math.BigDecimal;
import java.math.MathContext;
Expand All @@ -20,11 +21,7 @@
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;

public class AvroSerializer {
Expand Down Expand Up @@ -59,6 +56,16 @@ public class AvroSerializer {

public static GenericRecord recordSerializer(Map<String, Object> record, Schema schema) {
GenericRecord returnValue = new GenericData.Record(schema);
Set<String> schemaFields = schema.getFields().stream()
.map(Schema.Field::name).collect(Collectors.toSet());

Set<String> recordFields = record.keySet();

if (schemaFields.size() != recordFields.size()) {
Object[] missingFields = CollectionUtils.disjunction(schemaFields, recordFields).stream().toArray();
throw new IllegalArgumentException(" Record does not contain followings fields ".concat(Arrays.toString(missingFields)));
}

schema
.getFields()
.forEach(field -> {
Expand Down
3 changes: 0 additions & 3 deletions src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.akhq.modules;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.modules.schemaregistry.AvroSerializer;
import org.apache.avro.SchemaBuilder;
Expand All @@ -10,7 +9,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -66,5 +64,4 @@ void shouldFailIfDoesntMatchSchemaId() {
avroSerializer.serialize(INVALID_JSON);
});
}

}
10 changes: 6 additions & 4 deletions src/test/java/org/akhq/utils/AvroDeserializerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,17 @@ void testDefaultValue() {
+ " {\"name\": \"arrayField\", \"type\": {\"type\": \"array\", \"items\": \"double\"}, \"default\": []}"
+ " ]"
+ "}";
Map<String, Object> defaultValues = new HashMap<>();
defaultValues.put("stringField", null);
defaultValues.put("arrayField", List.of());

Schema schema = new Schema.Parser().parse(type);

GenericRecord expectedRecord = AvroSerializer.recordSerializer(Map.of(), schema);
GenericRecord expectedRecord = AvroSerializer.recordSerializer(defaultValues, schema);
assert new GenericData().validate(schema, expectedRecord);

Map<String, Object> result = AvroDeserializer.recordDeserializer(expectedRecord);
Map<String, Object> defaultValues = new HashMap<>();
defaultValues.put("stringField", null);
defaultValues.put("arrayField", List.of());

assertThat(result, is(defaultValues));
}
}
18 changes: 18 additions & 0 deletions src/test/java/org/akhq/utils/AvroSerializerTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package org.akhq.utils;

import org.apache.avro.SchemaBuilder;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

class AvroSerializerTest {

Expand Down Expand Up @@ -127,4 +130,19 @@ void testParseDateTime_minutes_local() {

}

private final org.apache.avro.Schema SCHEMA = SchemaBuilder
.record("schema1").namespace("org.akhq")
.fields()
.name("title").type().stringType().noDefault()
.name("release_year").type().intType().noDefault()
.name("rating").type().doubleType().noDefault()
.endRecord();

@Test
void shouldThrowIfSchemaAndRecordFieldsAreNotEqual() {
assertThrows(IllegalArgumentException.class, () -> {
AvroSerializer.recordSerializer(Map.of("title", "akhq"), SCHEMA);
});
}

}

0 comments on commit ec4acf0

Please sign in to comment.