Spring cloud streams HTTP source that validates payload against JSON schema or POJO and sends it to output stream. Partitioning is supported.
Check this README for details.
You can define Kafka topic this way:
spring:
cloud:
stream:
bindings:
output:
destination: foobar
In order to return HTTP 500 when Kafka is unavailable, enable synchronous Kafka binder:
spring:
cloud:
stream:
kafka:
bindings:
output:
producer:
sync: true
Service can validate payload against JSON Schema and POJO class. If both mechanisms are used, JSON Schema has precedence.
To validate incoming payload against JSON Schema configure its location in the configuration. You can place it on classpath or refer to the specific file:
http:
json:
schema-location: <location e.g. classpath:/schema.json or file:///path/to/schema.json
To validate incoming payload against POJO, add it to the classpath and put its full name into configuration. The validation is performed using Jackson + JSR 380 Validation.
http:
pojo:
class-name: <full class name, e.g. some.package.Person>
The POJO can use javax.validation
annotations. For samples check
sample model in tests.
In order to reject payload with unknown properties, you need to tune Jackson deserialization properties as follows:
spring:
jackson:
deserialization:
FAIL_ON_UNKNOWN_PROPERTIES: true
Messages can be partitioned in two ways:
- By Spring Cloud Streams
- By Kafka itself
In this scenario, Spring Cloud Stream based on the message key, calculates the partition to which the message should
be published. Put the following configuration into application.yml
file:
spring:
cloud:
stream:
bindings:
output:
producer:
partitionCount: <...>
partitionKeyExpression: <...>
Where:
partitionCount
- defines the number of partitions for topicpartitionKeyExpression
- defines the SpEL expression used to calculate the key that will be used to partition the message, e.g.payload.id
will useid
fiels from request body- check Spring Cloud Stream reference for details and other partitioning options
In this scenario, each message has assigned key, based on which Kafka will calculate the partition, to which the message should be published.
The key is calculated by the service based on the POJO class. The extracted key is stored in the keyBytes
header
which can be used in Kafka Binder configuration:
spring:
cloud:
stream:
kafka:
bindings:
output:
producer:
messageKeyExpression: headers['keyBytes']
To extract key using JSON Path, define the JSON Path expression in the configuration:
http:
json:
key-expression: <JSON Path expression, e.g. $.lastname>
To extract key using POJO class, first you need to define the POJO class using http.pojo.class-name
property.
Then you define the SpEL expression used to calculate the key. The base for the SpEL expression is the request body
converted to the POJO class.
http:
pojo:
key-expression: <SpEL expression, e.g. lastname>