-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Decaton can consume any topic with deserializer #241
Conversation
public static final PropertyDefinition<Boolean> CONFIG_TASK_METADATA_AS_HEADER = | ||
PropertyDefinition.define("decaton.task.metadata.as.header", Boolean.class, true, | ||
public static final PropertyDefinition<Boolean> CONFIG_RETRY_TASK_AS_LEGACY_FORMAT = | ||
PropertyDefinition.define("decaton.retry.task.as.legacy.format", Boolean.class, false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed the property and flipped the meaning, because
- For
decaton.parse.as.legacy.format.when.header.missing
, I couldn't figure out good "enabled-by-default" naming. - Then, for
decaton.task.metadata.as.header
, during migration, "Disable one and enable one" might be too confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably task.in.legacy.format
sounds more correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returning once
public static final PropertyDefinition<Boolean> CONFIG_TASK_METADATA_AS_HEADER = | ||
PropertyDefinition.define("decaton.task.metadata.as.header", Boolean.class, true, | ||
public static final PropertyDefinition<Boolean> CONFIG_RETRY_TASK_AS_LEGACY_FORMAT = | ||
PropertyDefinition.define("decaton.retry.task.as.legacy.format", Boolean.class, false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably task.in.legacy.format
sounds more correct?
* Reloadable: yes | ||
*/ | ||
public static final PropertyDefinition<Boolean> CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING = | ||
PropertyDefinition.define("decaton.parse.as.legacy.format.when.header.missing", Boolean.class, false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about decaton.legacy.parse.fallback.enabled
? (just to make it bit shorter...)
@@ -31,8 +31,13 @@ | |||
|
|||
@RequiredArgsConstructor | |||
public class DefaultTaskExtractor<T> implements TaskExtractor<T> { | |||
private static final ThreadLocal<Boolean> parseAsLegacyWhenHeaderMissing = ThreadLocal.withInitial(() -> false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
h-m this might be a bit too hacky, given that we would have to maintain this compatibility mode for certain period of time isn't it?
Can we do it like;
- delay
DefaultTaskExctractor
instantiation to the time callingProcessorsBuilder#build
- so that we can instantiate it in
SubscriptionBuilder#build
- at that point we already have access to properties, so we can simply pass a boolean flag to the
DefaultTaskExtractor
constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's way better. Let me fix
@@ -116,15 +127,24 @@ public ProcessorsBuilder<T> thenProcess(DecatonProcessor<T> processor) { | |||
return thenProcess(new DecatonProcessorSupplierImpl<>(() -> processor, ProcessorScope.PROVIDED)); | |||
} | |||
|
|||
Processors<T> build(DecatonProcessorSupplier<byte[]> retryProcessorSupplier) { | |||
return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); | |||
Processors<T> build(DecatonProcessorSupplier<byte[]> retryProcessorSupplier, ProcessorProperties properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that just taking an instance of taskExtractor
as an argument of this method was ok, better to do it with "constructor" thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To instantiate TaskExtractor on build
's caller (i.e. SubscriptionBuilder), ProcessorsBuilder needs to store Deserializer
or TaskExtractor
passed from user through consuming
and SubscriptionBuilder need to instantiate appropriate resulting TaskExtractor accordingly, which is fairly complicated.
Maybe just deferring the instantiation by constructor is straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, but storing constructor might be unnecessary actually.
Let me fix
// | ||
// From Decaton perspective, there is no way to distinguish between these two cases, | ||
// so we need to rely on a configuration to determine how to deserialize the task. | ||
if (legacyFallbackEnabledProperty.value()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
h-m should this be dynamically reloadable? i just sense that it might be better fixed for the whole lifetime of this subscription, but idk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should make property reloadable unless it's difficult or there is big downside, because it can reduce the number of rolling-restart which is a certain burden and enables quick rollback.
Since upgrade-procedure might be tough already, making reloadable might be better.
The downside I can imagine is the blast radius when property change caused a problem (because property change will happen on all instances at the same time), but quick-rollback by dynamic-reloading might be more preferred than rolling restart the application I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Motivation
DecatonTaskRequest
protobuf, which means records produced by DecatonClient and ones produced by non-DecatonClient producers (serialized by Kafka's serializer) have identical serialized bytes.Deserializer
support consuming non-DecatonClient topics, without forcing users to implementTaskExtractor
(which is bit bothersome than just a deserializer) ?Summary of changes
ProcessorsBuilder#consuming(String topic, Deserializer<T> deserializer)
now deserializes messages bytes directly even when task-metadata header is missing. This is a breaking changeDecatonTaskRequest
pb first, then deserialize serialized task in it. This was for<= 8.0.0 to 9.0.0
migration where old DecatonClient might produce records during the upgrade.decaton.parse.as.legacy.format.when.header.missing
property to control the behavior for graceful migration.TaskMetadata#timestampMillis
.timestampMillis
, we addConsumedRecord#recordTimestampMillis
, which can be retrieved from record itselfExpected upgrade procedure from <= 8.0.1 to 9.0.0
decaton.retry.task.as.legacy.format
anddecaton.parse.as.legacy.format.when.header.missing
decaton.retry.task.as.legacy.format
decaton.parse.as.legacy.format.when.header.missing
I'll describe the detail in 9.0.0 release note