-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CloudEvents support #252
CloudEvents support #252
Conversation
.withHeaders(() -> (Map) headers) | ||
.withPayload(() -> (String) valueBuffer) | ||
.unmarshal(), | ||
it -> ByteBuffer.wrap(Json.binaryEncode(it)).asReadOnlyBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have yet to fully review what this PR exactly does, but the usage of JSON seems wrong. In memory should just store events as object references, shouldn't it?
(and we don't want to use JSON anywhere, in-memory or kafka, as a matter of fact)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this Function
converts the in-memory representation to a ByteBuffer
when pre/post-processor is unable to handle the in-memory format, mostly due to legacy reasons.
Eventually, the processors API will be changed to use CloudEvent
as the type, not Object
, and the functions will go away.
In other words, we use CloudEvent
as the in-memory type and we fallback to the JSON representation when bytes are requested (either BINARY
consumer format or pre/post-processors)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D'oh! Of course 1 second after replying I got what you mean 😂
in-memory forces the serialization to avoid hard references and non-serializable objects that otherwise would fail with kafka/pulsar/any other real storage. JSON was selected as the easiest
request.getKey().asReadOnlyByteBuffer(), | ||
it -> (ByteBuffer) it, | ||
cloudEventBuilder.build(), | ||
it -> ByteBuffer.wrap(Json.binaryEncode(it)).asReadOnlyBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this feels weird, for reasons slightly different from the in-memory case (assuming I understand what this code does from a very quick glance). We don't want to use JSON as a preferred way of storing data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First part of this answer explains this part:
#252 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl;dr: this function is only executed when some pre-processor asks for the ByteBuffer
of this envelope (aka "serialize it for me"). It is there for the backward compatibility and will be removed once CloudEvent
becomes the only type of Envelope
's value.
|
||
if (!(rawValue instanceof CloudEvent)) { | ||
// TODO Add Envelope#event and make CloudEvent a fist-class citizen | ||
throw new IllegalArgumentException("Must be a CloudEvent!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying this PR in the context of projectriff, it seems I'm always hitting this line. I'm sure I'm doing something wrong, but I don't know what yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure you publish with the new protocol (and set the liiklusEvent
field)
Successfully tested this is the context of riff, using Kafka:
|
@ericbottard thanks for checking! 👍 FYI I discovered that I was using wrong mappers (from HTTP) and adjusted it to follow the Kafka protocol binding described here: |
) { | ||
Map<String, String> result = new HashMap<>(); | ||
|
||
extensions.forEach((key, value) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a NO-OP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is result.put
for all non-null values, unless I am missing something :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad, I thought this was acting in place (which would cause concurrent modification anyway). So my other comment is moot too.
@ericbottard I remember that you needed the extensions support. |
This is what I see when I set a
|
@ericbottard that's something I wanted to fix. Even in the official SDK, custom extensions are not prefixed: |
@ericbottard keep in mind that I haven't decided yet whether liiklus should validate CloudEvent's values, btw (for performance reasons). WDYT? |
Using When you say
are you implying that liiklus writes the header, or are you relying on the (currently broken) sdk? In any case, the values seem to propagate "correctly" from publication to Kafka. |
@ericbottard I will fix it in Liiklus later today (make it prefix all headers with |
@ericbottard I fixed the extensions mapping issue :) |
I can confirm that I saw our headers transit all the way using extensions in Kafka. |
@ericbottard thanks a lot for checking! 👍 |
api/src/test/java/com/github/bsideup/liiklus/records/LiiklusCloudEventTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, lgtm at this moment :) will see how it would be to implement a plugin! :D
I'm curious to see if cloudevents/spec#576 meets your needs? |
@drtriumph thanks! I will definitely have a look next week when I am back from vacation 👍 |
CloudEvents is an essential choice for Liiklus since it is the standard that we were missing when we created Liiklus.
It introduces a clear notion of
id
,type
and other fields that can be helpful for applying schema, RBAC, metrics and many other featuresAt this stage, CloudEvents v1 is used as internal representation, and
LiiklusEvent
at the API level since there is no Protobuf format for CloudEvents yet:cloudevents/spec#504
LiiklusEvent
follows the v1 of the spec. Once Protobuf definitions are available, we will deprecateLiiklusEvent
and start usingCloudEvent
message type.oneof
andformat
are there for a smooth migration.The implementation is mostly backward compatible, but, if you have events in the old, binary format, you will need to provide a plugin to "upcast" them (use
RecordPreProcessor
/RecordPostProcessor
)