From fd46ca11daf701cd9ba2dbc0d77fd1f4a8873e37 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 15 Aug 2024 11:19:37 +0900 Subject: [PATCH 1/5] Make Decaton can consume any topic with deserializer --- docs/getting-started.adoc | 2 - docs/index.adoc | 2 +- ...ing-any-topic.adoc => task-extractor.adoc} | 38 ++++++++++++------ .../decaton/processor/RetryQueueingTest.java | 8 ++-- .../processor/runtime/ConsumedRecord.java | 5 +++ .../runtime/ProcessorProperties.java | 28 ++++++++++--- .../processor/runtime/ProcessorsBuilder.java | 10 ++++- .../DecatonTaskRetryQueueingProcessor.java | 22 +++++------ .../internal/DefaultTaskExtractor.java | 39 ++++++++++++++----- .../runtime/internal/PartitionContext.java | 2 +- .../runtime/internal/ProcessPipeline.java | 6 +++ .../runtime/internal/TaskRequest.java | 1 + .../processors/CompactionProcessorTest.java | 2 +- ...DecatonTaskRetryQueueingProcessorTest.java | 4 +- .../internal/DefaultTaskExtractorTest.java | 27 +++++++++++++ .../runtime/internal/ProcessPipelineTest.java | 2 +- .../internal/ProcessingContextImplTest.java | 8 ++-- .../runtime/internal/ProcessorUnitTest.java | 2 +- 18 files changed, 155 insertions(+), 53 deletions(-) rename docs/{consuming-any-topic.adoc => task-extractor.adoc} (68%) diff --git a/docs/getting-started.adoc b/docs/getting-started.adoc index 9db9029a..637e3105 100644 --- a/docs/getting-started.adoc +++ b/docs/getting-started.adoc @@ -351,8 +351,6 @@ By leveraging Decaton's deferred completion and async-client of your middleware Now you know the basics and ready to start implementing Decaton apps! -If you're attempting to consume existing topic which contains records in schema other than Decaton's task protocol, or maybe you want to use task schema that can be understandable even for non-decaton consumers. In case visit link:./consuming-any-data.adoc[Consuming Arbitrary Topic] to see how. - For those thinking to run Decaton on production, link:./monitoring.adoc[Monitoring] might helps to always ensure your Decaton processors doing good. If you're using link:https://spring.io/[Spring] for running your applications, you might wanna take a look at link:./spring-integration.adoc[Spring Integration]. diff --git a/docs/index.adoc b/docs/index.adoc index 213fec50..4be1abc6 100644 --- a/docs/index.adoc +++ b/docs/index.adoc @@ -8,7 +8,7 @@ Decaton Documents - link:./runtime.adoc[Subpartition Runtime] - link:./spring-integration.adoc[Spring Integration] - link:./tracing.adoc[Tracing] -- link:./consuming-any-data.adoc[Use Decaton for consuming topics of non-Decaton tasks] +- link:./task-extractor.adoc[Implement custom task extractor] - link:./dynamic-property-configuration.adoc[Dynamic property configuration for the processor] - link:./monitoring.adoc[Monitoring Decaton] - Features diff --git a/docs/consuming-any-topic.adoc b/docs/task-extractor.adoc similarity index 68% rename from docs/consuming-any-topic.adoc rename to docs/task-extractor.adoc index fba0ec2b..88f4dd07 100644 --- a/docs/consuming-any-topic.adoc +++ b/docs/task-extractor.adoc @@ -1,16 +1,33 @@ -Consuming Arbitrary Topic -========================= +TaskExtractor +============= :base_version: 9.0.0 :modules: common,protocol,processor -This document guides you how to consume and process topics containing records not produced by DecatonClient using Decaton processors. +[NOTE] +==== +From Decaton 9.0.0, you can just use link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder#consuming(String topic, Deserializer deserializer)] to consume arbitrary topics, +without worrying if the topic is produced by DecatonClient or not. + +You may need to read through this guide *ONLY* in specific situations described just below. +==== + +Decaton provides two ways to consume topics in link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder]: + +* `ProcessorsBuilder#consuming(String topic, Deserializer deserializer)` +* `ProcessorsBuilder#consuming(String topic, TaskExtractor deserializer)` -By default, Decaton assumes messages are produced `DecatonClient`, where task metadata are stored as link:../protocol/src/main/proto/decaton.proto[TaskMetadataProto] in record headers. -But Decaton has the capability to consume arbitrary topics other than topics produced by `DecatonClient`. +As you may have learned through link:./getting-started.adoc[Getting Started], former is the most common and convenient way to consume topics, where you can just pass a value deserializer. -This means you can use Decaton as a drop-in replacement for a vanilla KafkaConsumer to leverage powerful features like deferred completion, delayed processing and so on. +However, sometimes you may need to apply custom logic to extract a task from raw consumed messages: -Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to consume arbitrary formats other than JSON. +* You need to extract custom task metadata on consumption. (e.g. Set `scheduledTimeMillis` for delayed processing) +* You need to access additional information (e.g. record headers) for deserialization + +This is where latter way with `TaskExtractor` comes in. + +This guide will show you how to implement `TaskExtractor` and use it. + +Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization. == TaskExtractor @@ -29,6 +46,7 @@ public class JSONUserEventExtractor implements TaskExtractor { TaskMetadata metadata = TaskMetadata.builder() // Filling timestampMillis is not mandatory, but it would be useful // when you monitor delivery latency between event production time and event processing time. + // Also, this will be used for scheduling tasks when scheduledTimeMillis is set. .timestampMillis(event.getEventTimestampMillis()) // This field is not mandatory too, but you can track which application produced the task by filling this. .sourceApplicationId("event-tracker") @@ -66,7 +84,7 @@ public class UserEventProcessor implements DecatonProcessor { } ---- -As you can see, once you implement TaskExtractor, the implementation of the DecatonProcessor can be done as when you consume a regular Decaton topic. +As you can see, there's no difference the implementation of the DecatonProcessor from the case where you use `Deserializer`. Lastly, you need to instantiate link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java[ProcessorSubscription] as follows. @@ -84,11 +102,9 @@ ProcessorsBuilder.consuming( ... ---- -You have to pass TaskExtractor which you implemented above instead of link:../common/src/main/java/com/linecorp/decaton/common/Deserializer.java[Deseiralizer]. - == Run Example -Now we are ready to process a JSON topic. +Now we are ready to process a JSON topic with custom task extraction logic. Before trying out, let's download and extract the kafka binary from https://kafka.apache.org/downloads to use `kafka-console-producer.sh`. diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index 06bd2dca..2ecd7754 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -259,15 +259,17 @@ public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception { @Timeout(60) public void testRetryQueueingMigrateToHeader() throws Exception { DynamicProperty metadataAsHeader = - new DynamicProperty<>(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER); - metadataAsHeader.set(false); + new DynamicProperty<>(ProcessorProperties.CONFIG_RETRY_TASK_AS_LEGACY_FORMAT); + metadataAsHeader.set(true); AtomicInteger processCount = new AtomicInteger(0); CountDownLatch migrationLatch = new CountDownLatch(1); ProcessorTestSuite .builder(rule) .numTasks(100) - .propertySupplier(StaticPropertySupplier.of(metadataAsHeader)) + .propertySupplier(StaticPropertySupplier.of( + metadataAsHeader, + Property.ofStatic(ProcessorProperties.CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING, true))) .produceTasksWithHeaderMetadata(false) .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { if (ctx.metadata().retryCount() == 0) { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java index 590386b3..258bbd01 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java @@ -29,6 +29,11 @@ @Builder @Accessors(fluent = true) public class ConsumedRecord { + /** + * The timestamp of the record + */ + long recordTimestamp; + /** * Headers of the record */ diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java index b71d047c..5d1672a6 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java @@ -24,6 +24,7 @@ import org.slf4j.MDC; +import com.linecorp.decaton.common.Deserializer; import com.linecorp.decaton.processor.Completion; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; @@ -226,17 +227,31 @@ public class ProcessorProperties extends AbstractDecatonProperties { Long.MAX_VALUE, v -> v instanceof Long && (Long) v >= 0); /** - * Controls whether to produce retry tasks with task metadata as headers, instead of as deprecated - * {@link DecatonTaskRequest} format. + * Controls whether to produce retry tasks in deprecated {@link DecatonTaskRequest} format. *

- * CAUTION!!! YOU MAY NEED TO SET THIS TO FALSE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER + * CAUTION!!! YOU MAY NEED TO SET THIS TO TRUE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER *

* Please read Decaton 9.0.0 Release Note carefully. *

* Reloadable: yes */ - public static final PropertyDefinition CONFIG_TASK_METADATA_AS_HEADER = - PropertyDefinition.define("decaton.task.metadata.as.header", Boolean.class, true, + public static final PropertyDefinition CONFIG_RETRY_TASK_AS_LEGACY_FORMAT = + PropertyDefinition.define("decaton.retry.task.as.legacy.format", Boolean.class, false, + v -> v instanceof Boolean); + + /** + * Controls whether to parse records as {@link DecatonTaskRequest} format when task metadata header is missing + * when {@link Deserializer} is used, instead of parsing task directly with the deserializer and + * fill reasonably-default task metadata. + *

+ * CAUTION!!! YOU MAY NEED TO SET THIS TO TRUE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER + *

+ * Please read Decaton 9.0.0 Release Note carefully. + *

+ * Reloadable: yes + */ + public static final PropertyDefinition CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING = + PropertyDefinition.define("decaton.parse.as.legacy.format.when.header.missing", Boolean.class, false, v -> v instanceof Boolean); public static final List> PROPERTY_DEFINITIONS = @@ -253,7 +268,8 @@ public class ProcessorProperties extends AbstractDecatonProperties { CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS, CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS, CONFIG_PER_KEY_QUOTA_PROCESSING_RATE, - CONFIG_TASK_METADATA_AS_HEADER)); + CONFIG_RETRY_TASK_AS_LEGACY_FORMAT, + CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING)); /** * Find and return a {@link PropertyDefinition} from its name. diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index b519ac63..1e5bbacb 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -22,6 +22,7 @@ import com.linecorp.decaton.common.Deserializer; import com.linecorp.decaton.processor.DecatonProcessor; +import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.internal.DecatonProcessorSupplierImpl; import com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor; import com.linecorp.decaton.processor.runtime.internal.Processors; @@ -53,7 +54,13 @@ public ProcessorsBuilder(String topic, TaskExtractor taskExtractor, TaskExtra /** * Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type - * which can be parsed by valueParser. + * which can be parsed by deserializer. + *

+ * From Decaton 9.0.0, you can use this overload to consume tasks from arbitrary topics not only + * topics that are produced by DecatonClient. + *

+ * If you want to extract custom {@link TaskMetadata} (e.g. for delayed processing), you can use + * {@link #consuming(String, TaskExtractor)} instead. * @param topic the name of topic to consume. * @param deserializer the deserializer to instantiate task of type {@link T} from serialized bytes. * @param the type of instantiated tasks. @@ -134,6 +141,7 @@ public DecatonTask extract(ConsumedRecord record) { DecatonTask outerTask = outerExtractor.extract(record); ConsumedRecord inner = ConsumedRecord .builder() + .recordTimestamp(record.recordTimestamp()) .headers(record.headers()) .key(record.key()) .value(outerTask.taskDataBytes()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index 1535145a..acea8b56 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -45,14 +45,14 @@ public class DecatonTaskRetryQueueingProcessor implements DecatonProcessor metadataAsHeaderProperty; + private final Property retryTaskAsLegacyFormatProperty; public DecatonTaskRetryQueueingProcessor(SubscriptionScope scope, DecatonTaskProducer producer) { RetryConfig retryConfig = scope.retryConfig().get(); // This won't be instantiated unless it present this.producer = producer; backoff = retryConfig.backoff(); retryTopic = scope.retryTopic().get(); // This won't be instantiated unless it present - metadataAsHeaderProperty = scope.props().get(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER); + retryTaskAsLegacyFormatProperty = scope.props().get(ProcessorProperties.CONFIG_RETRY_TASK_AS_LEGACY_FORMAT); metrics = Metrics.withTags("subscription", scope.subscriptionId()).new RetryMetrics(); } @@ -70,15 +70,7 @@ public void process(ProcessingContext context, byte[] serializedTask) .build(); final ProducerRecord record; - if (metadataAsHeaderProperty.value()) { - record = new ProducerRecord<>( - retryTopic, - null, - context.key(), - serializedTask, - context.headers()); - TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers()); - } else { + if (retryTaskAsLegacyFormatProperty.value()) { DecatonTaskRequest request = DecatonTaskRequest.newBuilder() .setMetadata(taskMetadata) @@ -90,6 +82,14 @@ record = new ProducerRecord<>( context.key(), request.toByteArray(), context.headers()); + } else { + record = new ProducerRecord<>( + retryTopic, + null, + context.key(), + serializedTask, + context.headers()); + TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers()); } metrics.retryTaskRetries.record(nextRetryCount); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index 458454d0..b11b2390 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -31,8 +31,13 @@ @RequiredArgsConstructor public class DefaultTaskExtractor implements TaskExtractor { + private static final ThreadLocal parseAsLegacyWhenHeaderMissing = ThreadLocal.withInitial(() -> false); private final Deserializer taskDeserializer; + public static void setParseAsLegacyWhenHeaderMissing(boolean parseAsLegacy) { + parseAsLegacyWhenHeaderMissing.set(parseAsLegacy); + } + @Override public DecatonTask extract(ConsumedRecord record) { TaskMetadataProto headerMeta = TaskMetadataUtil.readFromHeader(record.headers()); @@ -43,17 +48,33 @@ public DecatonTask extract(ConsumedRecord record) { taskDeserializer.deserialize(taskDataBytes), taskDataBytes); } else { - try { - DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); - TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); - byte[] taskDataBytes = taskRequest.getSerializedTask().toByteArray(); + // There are two cases where task metadata header is missing: + // 1. The task is produced by an old producer which wraps tasks in DecatonTaskRequest proto. + // 2. The task is produced by non-DecatonClient producer. + // + // From Decaton perspective, there is no way to distinguish between these two cases, + // so we need to rely on a configuration to determine how to deserialize the task. + if (parseAsLegacyWhenHeaderMissing.get()) { + try { + DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); + TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); + byte[] taskDataBytes = taskRequest.getSerializedTask().toByteArray(); + return new DecatonTask<>( + metadata, + taskDeserializer.deserialize(taskDataBytes), + taskDataBytes); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); + } + } else { + T task = taskDeserializer.deserialize(record.value()); return new DecatonTask<>( - metadata, - taskDeserializer.deserialize(taskDataBytes), - taskDataBytes); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException(e); + TaskMetadata.builder() + .timestampMillis(record.recordTimestamp()) + .build(), + task, + record.value()); } } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java index bd3f0f4a..3aff3419 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java @@ -174,7 +174,7 @@ public void addRecord(ConsumerRecord record, QuotaApplier quotaApplier) { if (!quotaApplier.apply(record, offsetState, maybeRecordQuotaUsage(record.key()))) { TaskRequest request = new TaskRequest( - scope.topicPartition(), record.offset(), offsetState, record.key(), + record.timestamp(), scope.topicPartition(), record.offset(), offsetState, record.key(), record.headers(), traceHandle, record.value(), maybeRecordQuotaUsage(record.key())); subPartitions.addTask(request); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java index 61d5265a..5ab6e659 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java @@ -124,9 +124,15 @@ public CompletionStage scheduleThenProcess(TaskRequest request) throws Int // visible for testing DecatonTask extract(TaskRequest request) { + // This is a workaround to pass the config to TaskExtractor + // since it doesn't have a reference to ProcessorProperties. + DefaultTaskExtractor.setParseAsLegacyWhenHeaderMissing( + scope.props().get(ProcessorProperties.CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING).value()); + final DecatonTask extracted; extracted = taskExtractor.extract( ConsumedRecord.builder() + .recordTimestamp(request.recordTimestamp()) .headers(request.headers()) .key(request.key()) .value(request.rawRequestBytes()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java index 3d4b6d14..2936e12f 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java @@ -32,6 +32,7 @@ @Accessors(fluent = true) @AllArgsConstructor public class TaskRequest { + private final long recordTimestamp; private final TopicPartition topicPartition; private final long recordOffset; private final OffsetState offsetState; diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java index fdcd09ff..47e16dbd 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java @@ -100,7 +100,7 @@ private TaskInput put(DecatonProcessor processor, taskData, taskData.toByteArray()); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, name.getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, null, null); + 1723687072569L, new TopicPartition("topic", 1), 1, null, name.getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, null, null); ProcessingContext context = spy(new ProcessingContextImpl<>("subscription", request, task, diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java index 5d45733f..b5c67830 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java @@ -171,8 +171,8 @@ public void testLegacyRetryTaskFormat() throws Exception { SubPartitionRuntime.THREAD_POOL, Optional.of(RetryConfig.builder().backoff(RETRY_BACKOFF).build()), Optional.empty(), ProcessorProperties.builder() - .set(Property.ofStatic(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER, - false)) + .set(Property.ofStatic(ProcessorProperties.CONFIG_RETRY_TASK_AS_LEGACY_FORMAT, + true)) .build(), NoopTracingProvider.INSTANCE, ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS, DefaultSubPartitioner::new); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java index 2208ec85..e21ee962 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.Test; +import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; @@ -39,11 +40,13 @@ public class DefaultTaskExtractorTest { .build(); @Test public void testExtract() { + DefaultTaskExtractor.setParseAsLegacyWhenHeaderMissing(true); DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( new ProtocolBuffersDeserializer<>(HelloTask.parser())); ConsumedRecord record = ConsumedRecord .builder() + .recordTimestamp(1561709151628L) .headers(new RecordHeaders()) .value(LEGACY_REQUEST.toByteArray()) .build(); @@ -55,4 +58,28 @@ public void testExtract() { assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes()); } + + @Test + public void testExtractBypassLegacyFormatWhenHeaderMissing() { + DefaultTaskExtractor.setParseAsLegacyWhenHeaderMissing(false); + DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( + new ProtocolBuffersDeserializer<>(HelloTask.parser())); + + ConsumedRecord record = ConsumedRecord + .builder() + .recordTimestamp(1561709151628L) + .headers(new RecordHeaders()) + .value(TASK.toByteArray()) + .build(); + + DecatonTask extracted = extractor.extract(record); + + // check that reasonably default metadata is filled + assertEquals(TaskMetadata.builder() + .timestampMillis(1561709151628L) + .build(), extracted.metadata()); + assertEquals(TASK, extracted.taskData()); + + assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes()); + } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java index ce0bd277..8825cc10 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java @@ -96,7 +96,7 @@ public class ProcessPipelineTest { private static TaskRequest taskRequest() { return new TaskRequest( - new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, TASK.toByteArray(), null); + 1723687072569L, new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, TASK.toByteArray(), null); } @Mock diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java index b0960c6d..d0dc4bdb 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java @@ -69,6 +69,8 @@ @ExtendWith(MockitoExtension.class) public class ProcessingContextImplTest { + private static final long NOW = 1723687072569L; + private static class NamedProcessor implements DecatonProcessor { private final String name; private final DecatonProcessor impl; @@ -120,7 +122,7 @@ private static void terminateExecutor(ExecutorService executor) throws Interrupt private static ProcessingContextImpl context(RecordTraceHandle traceHandle, DecatonProcessor... processors) { TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), + NOW, new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, traceHandle, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( TaskMetadata.builder().build(), TASK, TASK.toByteArray()); @@ -366,7 +368,7 @@ public void testRetry() throws InterruptedException { CountDownLatch retryLatch = new CountDownLatch(1); DecatonProcessor retryProcessor = spy(new AsyncCompleteProcessor(retryLatch)); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); + NOW, new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( TaskMetadata.builder().build(), TASK.toByteArray(), TASK.toByteArray()); @@ -397,7 +399,7 @@ public void testRetryAtCompletionTimeout() throws InterruptedException { CountDownLatch retryLatch = new CountDownLatch(1); DecatonProcessor retryProcessor = spy(new AsyncCompleteProcessor(retryLatch)); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); + NOW, new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( TaskMetadata.builder().build(), TASK.toByteArray(), TASK.toByteArray()); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java index ed832b05..5038d5e1 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java @@ -67,7 +67,7 @@ public void setUp() { unit = spy(new ProcessorUnit(scope, pipeline, Executors.newSingleThreadExecutor())); - taskRequest = new TaskRequest(topicPartition, 1, new OffsetState(1234), null, null, null, HelloTask.getDefaultInstance().toByteArray(), null); + taskRequest = new TaskRequest(1723687072569L, topicPartition, 1, new OffsetState(1234), null, null, null, HelloTask.getDefaultInstance().toByteArray(), null); } @Test From 9b4e926c44b4be89d5aba3a7c4342a123d27bd97 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 15 Aug 2024 13:19:24 +0900 Subject: [PATCH 2/5] fix --- .../linecorp/decaton/processor/runtime/ConsumedRecord.java | 2 +- .../linecorp/decaton/processor/runtime/ProcessorsBuilder.java | 2 +- .../processor/runtime/internal/DefaultTaskExtractor.java | 2 +- .../decaton/processor/runtime/internal/ProcessPipeline.java | 2 +- .../processor/runtime/internal/DefaultTaskExtractorTest.java | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java index 258bbd01..83a5a7e3 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java @@ -32,7 +32,7 @@ public class ConsumedRecord { /** * The timestamp of the record */ - long recordTimestamp; + long recordTimestampMillis; /** * Headers of the record diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index 1e5bbacb..2e1ce53e 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -141,7 +141,7 @@ public DecatonTask extract(ConsumedRecord record) { DecatonTask outerTask = outerExtractor.extract(record); ConsumedRecord inner = ConsumedRecord .builder() - .recordTimestamp(record.recordTimestamp()) + .recordTimestampMillis(record.recordTimestampMillis()) .headers(record.headers()) .key(record.key()) .value(outerTask.taskDataBytes()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index b11b2390..1c5ca21a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -71,7 +71,7 @@ public DecatonTask extract(ConsumedRecord record) { T task = taskDeserializer.deserialize(record.value()); return new DecatonTask<>( TaskMetadata.builder() - .timestampMillis(record.recordTimestamp()) + .timestampMillis(record.recordTimestampMillis()) .build(), task, record.value()); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java index 5ab6e659..14b877e2 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java @@ -132,7 +132,7 @@ DecatonTask extract(TaskRequest request) { final DecatonTask extracted; extracted = taskExtractor.extract( ConsumedRecord.builder() - .recordTimestamp(request.recordTimestamp()) + .recordTimestampMillis(request.recordTimestamp()) .headers(request.headers()) .key(request.key()) .value(request.rawRequestBytes()) diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java index e21ee962..4b0fc5ae 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java @@ -46,7 +46,7 @@ public void testExtract() { ConsumedRecord record = ConsumedRecord .builder() - .recordTimestamp(1561709151628L) + .recordTimestampMillis(1561709151628L) .headers(new RecordHeaders()) .value(LEGACY_REQUEST.toByteArray()) .build(); @@ -67,7 +67,7 @@ public void testExtractBypassLegacyFormatWhenHeaderMissing() { ConsumedRecord record = ConsumedRecord .builder() - .recordTimestamp(1561709151628L) + .recordTimestampMillis(1561709151628L) .headers(new RecordHeaders()) .value(TASK.toByteArray()) .build(); From cbff3c2d5bd93d243c7bd451163a186b8fba1a19 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 15 Aug 2024 16:44:15 +0900 Subject: [PATCH 3/5] get rid of workaround --- .../decaton/processor/RetryQueueingTest.java | 4 +- .../runtime/ProcessorProperties.java | 12 +++--- .../processor/runtime/ProcessorsBuilder.java | 39 ++++++++++++------- .../runtime/SubscriptionBuilder.java | 2 +- .../DecatonTaskRetryQueueingProcessor.java | 6 +-- .../internal/DefaultTaskExtractor.java | 9 ++--- .../runtime/internal/ProcessPipeline.java | 5 --- .../runtime/ProcessorSubscriptionTest.java | 6 +-- ...DecatonTaskRetryQueueingProcessorTest.java | 2 +- .../internal/DefaultTaskExtractorTest.java | 10 +++-- .../runtime/internal/ProcessorsTest.java | 4 +- 11 files changed, 54 insertions(+), 45 deletions(-) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index 2ecd7754..62ba7da5 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -259,7 +259,7 @@ public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception { @Timeout(60) public void testRetryQueueingMigrateToHeader() throws Exception { DynamicProperty metadataAsHeader = - new DynamicProperty<>(ProcessorProperties.CONFIG_RETRY_TASK_AS_LEGACY_FORMAT); + new DynamicProperty<>(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT); metadataAsHeader.set(true); AtomicInteger processCount = new AtomicInteger(0); @@ -269,7 +269,7 @@ public void testRetryQueueingMigrateToHeader() throws Exception { .numTasks(100) .propertySupplier(StaticPropertySupplier.of( metadataAsHeader, - Property.ofStatic(ProcessorProperties.CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING, true))) + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true))) .produceTasksWithHeaderMetadata(false) .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { if (ctx.metadata().retryCount() == 0) { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java index 5d1672a6..ca65fa32 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java @@ -235,8 +235,8 @@ public class ProcessorProperties extends AbstractDecatonProperties { *

* Reloadable: yes */ - public static final PropertyDefinition CONFIG_RETRY_TASK_AS_LEGACY_FORMAT = - PropertyDefinition.define("decaton.retry.task.as.legacy.format", Boolean.class, false, + public static final PropertyDefinition CONFIG_RETRY_TASK_IN_LEGACY_FORMAT = + PropertyDefinition.define("decaton.retry.task.in.legacy.format", Boolean.class, false, v -> v instanceof Boolean); /** @@ -250,8 +250,8 @@ public class ProcessorProperties extends AbstractDecatonProperties { *

* Reloadable: yes */ - public static final PropertyDefinition CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING = - PropertyDefinition.define("decaton.parse.as.legacy.format.when.header.missing", Boolean.class, false, + public static final PropertyDefinition CONFIG_LEGACY_PARSE_FALLBACK_ENABLED = + PropertyDefinition.define("decaton.legacy.parse.fallback.enabled", Boolean.class, false, v -> v instanceof Boolean); public static final List> PROPERTY_DEFINITIONS = @@ -268,8 +268,8 @@ public class ProcessorProperties extends AbstractDecatonProperties { CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS, CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS, CONFIG_PER_KEY_QUOTA_PROCESSING_RATE, - CONFIG_RETRY_TASK_AS_LEGACY_FORMAT, - CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING)); + CONFIG_RETRY_TASK_IN_LEGACY_FORMAT, + CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)); /** * Find and return a {@link PropertyDefinition} from its name. diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index 2e1ce53e..c64cc2a5 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import java.util.function.Supplier; import com.linecorp.decaton.common.Deserializer; @@ -28,7 +29,6 @@ import com.linecorp.decaton.processor.runtime.internal.Processors; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; /** @@ -40,15 +40,17 @@ public class ProcessorsBuilder { @Getter private final String topic; - private final TaskExtractor taskExtractor; - private final TaskExtractor retryTaskExtractor; + private final Function, TaskExtractor> taskExtractorConstructor; + private final Function, TaskExtractor> retryTaskExtractorConstructor; private final List> suppliers; - public ProcessorsBuilder(String topic, TaskExtractor taskExtractor, TaskExtractor retryTaskExtractor) { + ProcessorsBuilder(String topic, + Function, TaskExtractor> taskExtractorConstructor, + Function, TaskExtractor> retryTaskExtractorConstructor) { this.topic = topic; - this.taskExtractor = taskExtractor; - this.retryTaskExtractor = retryTaskExtractor; + this.taskExtractorConstructor = taskExtractorConstructor; + this.retryTaskExtractorConstructor = retryTaskExtractorConstructor; suppliers = new ArrayList<>(); } @@ -67,8 +69,8 @@ public ProcessorsBuilder(String topic, TaskExtractor taskExtractor, TaskExtra * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, Deserializer deserializer) { - DefaultTaskExtractor taskExtractor = new DefaultTaskExtractor<>(deserializer); - return new ProcessorsBuilder<>(topic, taskExtractor, taskExtractor); + Function, TaskExtractor> constructor = prop -> new DefaultTaskExtractor<>(deserializer, prop); + return new ProcessorsBuilder<>(topic, constructor, constructor); } /** @@ -80,7 +82,9 @@ public static ProcessorsBuilder consuming(String topic, Deserializer d * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, TaskExtractor taskExtractor) { - return new ProcessorsBuilder<>(topic, taskExtractor, new RetryTaskExtractor<>(taskExtractor)); + return new ProcessorsBuilder<>(topic, + ignored -> taskExtractor, + prop -> new RetryTaskExtractor<>(prop, taskExtractor)); } /** @@ -123,15 +127,24 @@ public ProcessorsBuilder thenProcess(DecatonProcessor processor) { return thenProcess(new DecatonProcessorSupplierImpl<>(() -> processor, ProcessorScope.PROVIDED)); } - Processors build(DecatonProcessorSupplier retryProcessorSupplier) { - return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); + Processors build(DecatonProcessorSupplier retryProcessorSupplier, ProcessorProperties properties) { + Property legacyFallbackEnabledProperty = properties.get(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED); + return new Processors<>(suppliers, + retryProcessorSupplier, + taskExtractorConstructor.apply(legacyFallbackEnabledProperty), + retryTaskExtractorConstructor.apply(legacyFallbackEnabledProperty)); } - @RequiredArgsConstructor private static class RetryTaskExtractor implements TaskExtractor { - private final DefaultTaskExtractor outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes); + private final DefaultTaskExtractor outerExtractor; private final TaskExtractor innerExtractor; + RetryTaskExtractor(Property legacyFallbackEnabledProperty, + TaskExtractor innerExtractor) { + this.innerExtractor = innerExtractor; + this.outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes, legacyFallbackEnabledProperty); + } + @Override public DecatonTask extract(ConsumedRecord record) { // Retry tasks might be stored in retry-topic in DecatonTaskRequest format depending on diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java index 7b68be0c..81af148b 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java @@ -263,7 +263,7 @@ public ProcessorSubscription build() { return new ProcessorSubscription(scope, consumerSupplier.get(), quotaApplier(scope), - processorsBuilder.build(maybeRetryProcessorSupplier(scope)), + processorsBuilder.build(maybeRetryProcessorSupplier(scope), props), props, stateListener); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index acea8b56..a2a8211a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -45,14 +45,14 @@ public class DecatonTaskRetryQueueingProcessor implements DecatonProcessor retryTaskAsLegacyFormatProperty; + private final Property retryTaskInLegacyFormatProperty; public DecatonTaskRetryQueueingProcessor(SubscriptionScope scope, DecatonTaskProducer producer) { RetryConfig retryConfig = scope.retryConfig().get(); // This won't be instantiated unless it present this.producer = producer; backoff = retryConfig.backoff(); retryTopic = scope.retryTopic().get(); // This won't be instantiated unless it present - retryTaskAsLegacyFormatProperty = scope.props().get(ProcessorProperties.CONFIG_RETRY_TASK_AS_LEGACY_FORMAT); + retryTaskInLegacyFormatProperty = scope.props().get(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT); metrics = Metrics.withTags("subscription", scope.subscriptionId()).new RetryMetrics(); } @@ -70,7 +70,7 @@ public void process(ProcessingContext context, byte[] serializedTask) .build(); final ProducerRecord record; - if (retryTaskAsLegacyFormatProperty.value()) { + if (retryTaskInLegacyFormatProperty.value()) { DecatonTaskRequest request = DecatonTaskRequest.newBuilder() .setMetadata(taskMetadata) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index 1c5ca21a..139034cd 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -22,6 +22,7 @@ import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; @@ -31,12 +32,8 @@ @RequiredArgsConstructor public class DefaultTaskExtractor implements TaskExtractor { - private static final ThreadLocal parseAsLegacyWhenHeaderMissing = ThreadLocal.withInitial(() -> false); private final Deserializer taskDeserializer; - - public static void setParseAsLegacyWhenHeaderMissing(boolean parseAsLegacy) { - parseAsLegacyWhenHeaderMissing.set(parseAsLegacy); - } + private final Property legacyFallbackEnabledProperty; @Override public DecatonTask extract(ConsumedRecord record) { @@ -54,7 +51,7 @@ public DecatonTask extract(ConsumedRecord record) { // // From Decaton perspective, there is no way to distinguish between these two cases, // so we need to rely on a configuration to determine how to deserialize the task. - if (parseAsLegacyWhenHeaderMissing.get()) { + if (legacyFallbackEnabledProperty.value()) { try { DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java index 14b877e2..090c48d3 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java @@ -124,11 +124,6 @@ public CompletionStage scheduleThenProcess(TaskRequest request) throws Int // visible for testing DecatonTask extract(TaskRequest request) { - // This is a workaround to pass the config to TaskExtractor - // since it doesn't have a reference to ProcessorProperties. - DefaultTaskExtractor.setParseAsLegacyWhenHeaderMissing( - scope.props().get(ProcessorProperties.CONFIG_PARSE_AS_LEGACY_FORMAT_WHEN_HEADER_MISSING).value()); - final DecatonTask extracted; extracted = taskExtractor.extract( ConsumedRecord.builder() diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index 86bfd396..04076617 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -149,7 +149,7 @@ private static ProcessorSubscription subscription(Consumer consu scope, consumer, NoopQuotaApplier.INSTANCE, - builder.build(null), + builder.build(null, scope.props()), scope.props(), listener); } @@ -280,7 +280,7 @@ public synchronized ConsumerRecords poll(Duration timeout) { (ConsumedRecord record) -> new DecatonTask<>( TaskMetadata.builder().build(), "dummy", record.value())) .thenProcess(processor) - .build(null), + .build(null, scope.props()), scope.props(), newState -> { if (newState == State.RUNNING) { @@ -357,7 +357,7 @@ public synchronized void commitSync(Map offse ctx.deferCompletion().complete(); taskCompleted.countDown(); }) - .build(null), + .build(null, scope.props()), scope.props(), null); subscription.start(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java index b5c67830..507b557c 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java @@ -171,7 +171,7 @@ public void testLegacyRetryTaskFormat() throws Exception { SubPartitionRuntime.THREAD_POOL, Optional.of(RetryConfig.builder().backoff(RETRY_BACKOFF).build()), Optional.empty(), ProcessorProperties.builder() - .set(Property.ofStatic(ProcessorProperties.CONFIG_RETRY_TASK_AS_LEGACY_FORMAT, + .set(Property.ofStatic(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT, true)) .build(), NoopTracingProvider.INSTANCE, ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS, diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java index 4b0fc5ae..3e90c517 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java @@ -25,6 +25,8 @@ import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; +import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; @@ -40,9 +42,9 @@ public class DefaultTaskExtractorTest { .build(); @Test public void testExtract() { - DefaultTaskExtractor.setParseAsLegacyWhenHeaderMissing(true); DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( - new ProtocolBuffersDeserializer<>(HelloTask.parser())); + new ProtocolBuffersDeserializer<>(HelloTask.parser()), + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true)); ConsumedRecord record = ConsumedRecord .builder() @@ -61,9 +63,9 @@ public void testExtract() { @Test public void testExtractBypassLegacyFormatWhenHeaderMissing() { - DefaultTaskExtractor.setParseAsLegacyWhenHeaderMissing(false); DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( - new ProtocolBuffersDeserializer<>(HelloTask.parser())); + new ProtocolBuffersDeserializer<>(HelloTask.parser()), + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, false)); ConsumedRecord record = ConsumedRecord .builder() diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java index 2c2de91f..f97adcc0 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java @@ -37,6 +37,7 @@ import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier; import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.SubPartitionRuntime; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; import com.linecorp.decaton.protocol.Sample.HelloTask; @@ -69,7 +70,8 @@ public void testCleanupPartiallyInitializedProcessors() { Processors processors = new Processors<>( suppliers, null, - new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance()), + new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance(), + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)), null); doThrow(new RuntimeException("exception")).when(suppliers.get(2)).getProcessor(any(), any(), anyInt()); From b18ac2777e09cbec7d2924d9838bca70a5bdda36 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 15 Aug 2024 16:48:35 +0900 Subject: [PATCH 4/5] nits --- .../com/linecorp/decaton/processor/RetryQueueingTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index 62ba7da5..84bab2c1 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -258,9 +258,9 @@ public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception { @Test @Timeout(60) public void testRetryQueueingMigrateToHeader() throws Exception { - DynamicProperty metadataAsHeader = + DynamicProperty retryTaskInLegacyFormat = new DynamicProperty<>(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT); - metadataAsHeader.set(true); + retryTaskInLegacyFormat.set(true); AtomicInteger processCount = new AtomicInteger(0); CountDownLatch migrationLatch = new CountDownLatch(1); @@ -268,7 +268,7 @@ public void testRetryQueueingMigrateToHeader() throws Exception { .builder(rule) .numTasks(100) .propertySupplier(StaticPropertySupplier.of( - metadataAsHeader, + retryTaskInLegacyFormat, Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true))) .produceTasksWithHeaderMetadata(false) .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { @@ -278,7 +278,7 @@ public void testRetryQueueingMigrateToHeader() throws Exception { if (cnt < 50) { ctx.retry(); } else if (cnt == 50) { - metadataAsHeader.set(true); + retryTaskInLegacyFormat.set(true); migrationLatch.countDown(); ctx.retry(); } else { From 31b1b489ddba1411aea7b7ba9e3549e5f6062c4d Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 15 Aug 2024 19:50:47 +0900 Subject: [PATCH 5/5] refactor --- .../processor/runtime/ProcessorsBuilder.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index c64cc2a5..fd7af07c 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.function.Function; import java.util.function.Supplier; import com.linecorp.decaton.common.Deserializer; @@ -40,17 +39,15 @@ public class ProcessorsBuilder { @Getter private final String topic; - private final Function, TaskExtractor> taskExtractorConstructor; - private final Function, TaskExtractor> retryTaskExtractorConstructor; + private final Deserializer userSuppliedDeserializer; + private final TaskExtractor userSuppliedTaskExtractor; private final List> suppliers; - ProcessorsBuilder(String topic, - Function, TaskExtractor> taskExtractorConstructor, - Function, TaskExtractor> retryTaskExtractorConstructor) { + ProcessorsBuilder(String topic, Deserializer userSuppliedDeserializer, TaskExtractor userSuppliedTaskExtractor) { this.topic = topic; - this.taskExtractorConstructor = taskExtractorConstructor; - this.retryTaskExtractorConstructor = retryTaskExtractorConstructor; + this.userSuppliedDeserializer = userSuppliedDeserializer; + this.userSuppliedTaskExtractor = userSuppliedTaskExtractor; suppliers = new ArrayList<>(); } @@ -69,8 +66,7 @@ public class ProcessorsBuilder { * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, Deserializer deserializer) { - Function, TaskExtractor> constructor = prop -> new DefaultTaskExtractor<>(deserializer, prop); - return new ProcessorsBuilder<>(topic, constructor, constructor); + return new ProcessorsBuilder<>(topic, deserializer, null); } /** @@ -82,9 +78,7 @@ public static ProcessorsBuilder consuming(String topic, Deserializer d * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, TaskExtractor taskExtractor) { - return new ProcessorsBuilder<>(topic, - ignored -> taskExtractor, - prop -> new RetryTaskExtractor<>(prop, taskExtractor)); + return new ProcessorsBuilder<>(topic, null, taskExtractor); } /** @@ -129,10 +123,22 @@ public ProcessorsBuilder thenProcess(DecatonProcessor processor) { Processors build(DecatonProcessorSupplier retryProcessorSupplier, ProcessorProperties properties) { Property legacyFallbackEnabledProperty = properties.get(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED); - return new Processors<>(suppliers, - retryProcessorSupplier, - taskExtractorConstructor.apply(legacyFallbackEnabledProperty), - retryTaskExtractorConstructor.apply(legacyFallbackEnabledProperty)); + + final TaskExtractor taskExtractor; + final TaskExtractor retryTaskExtractor; + + // consuming(String, Deserializer) is used + if (userSuppliedDeserializer != null) { + DefaultTaskExtractor extractor = new DefaultTaskExtractor<>(userSuppliedDeserializer, legacyFallbackEnabledProperty); + taskExtractor = extractor; + retryTaskExtractor = extractor; + } else { + // consuming(String, TaskExtractor) is used + taskExtractor = userSuppliedTaskExtractor; + retryTaskExtractor = new RetryTaskExtractor<>(legacyFallbackEnabledProperty, userSuppliedTaskExtractor); + } + + return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); } private static class RetryTaskExtractor implements TaskExtractor {