diff --git a/docs/consuming-any-data.adoc b/docs/consuming-any-topic.adoc similarity index 68% rename from docs/consuming-any-data.adoc rename to docs/consuming-any-topic.adoc index 444e6a1a..786d3cdc 100644 --- a/docs/consuming-any-data.adoc +++ b/docs/consuming-any-topic.adoc @@ -1,20 +1,30 @@ Consuming Arbitrary Topic ========================= -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: common,protocol,processor -This document guides you how to consume and process topics containing records not consists of Decaton's protocol (not produced by DecatonClient) using Decaton processors. +This document covers an advanced usage of Decaton processor when you process topics containing records not produced by DecatonClient. -By default, Decaton assumes messages are serialized as `DecatonTaskRequest` defined in link:../protocol/src/main/proto/decaton.proto[decaton.proto], and `DecatonProcessor` extracts the task from its `serialized_task` field. -But Decaton has the capability to consume arbitrary topics other than topics consisting records of Decaton protocol. +[NOTE] +==== +* From Decaton 9.0.0, DecatonClient no longer wraps tasks with DecatonTaskRequest protobuf message. +** The wrapper was a heritage from the time when Kafka didn't support record headers to store task metadata. +** Now task metadata is stored in record headers instead, so there are no differences in wire format between tasks produced by DecatonClient and by other clients. +* Hence, you can just use link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder#consuming(String topic, Deserializer deserializer)] to process arbitrary topics in most cases. +** So the expected use cases of this guide are: +*** You need to apply custom task metadata extraction logic. (e.g. Set `scheduledTimeMillis` for delayed processing) +*** You need to access additional information (e.g. record headers) for deserialization +==== + +By default, Decaton assumes messages are produced by DecatonClient where task metadata is stored as link:../protocol/src/main/proto/decaton.proto[TaskMetadataProto] in record headers. -This means you can use Decaton as a drop-in replacement for a vanilla KafkaConsumer to leverage powerful features like deferred completion, delayed processing and so on. +When you consume a topic not produced by DecatonClient, you can apply custom task metadata extraction logic. -Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to consume arbitrary formats other than JSON. +Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to apply to arbitrary formats other than JSON. == TaskExtractor -First, you need to start by implementing your own link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java[TaskExtractor] to deserialize a task from raw message bytes. +First, you need to start by implementing your own link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java[TaskExtractor] to extract a task from raw consumed messages. [source,java] .JSONUserEventExtractor.java @@ -23,9 +33,9 @@ public class JSONUserEventExtractor implements TaskExtractor { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override - public DecatonTask extract(byte[] bytes) { + public DecatonTask extract(ConsumedRecord record) { try { - UserEvent event = MAPPER.readValue(bytes, UserEvent.class); + UserEvent event = MAPPER.readValue(record.value(), UserEvent.class); TaskMetadata metadata = TaskMetadata.builder() // Filling timestampMillis is not mandatory, but it would be useful // when you monitor delivery latency between event production time and event processing time. @@ -35,7 +45,7 @@ public class JSONUserEventExtractor implements TaskExtractor { // You can set other TaskMetadata fields as you needed .build(); - return new DecatonTask<>(metadata, event, bytes); + return new DecatonTask<>(metadata, event, record.value()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/docs/dynamic-property-configuration.adoc b/docs/dynamic-property-configuration.adoc index f62b6145..3e2609fc 100644 --- a/docs/dynamic-property-configuration.adoc +++ b/docs/dynamic-property-configuration.adoc @@ -1,6 +1,6 @@ Dynamic Property Configuration ============================= -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: centraldogma,processor == Property Supplier diff --git a/docs/example/src/main/java/example/JSONUserEventExtractor.java b/docs/example/src/main/java/example/JSONUserEventExtractor.java index 5209fc67..77789724 100644 --- a/docs/example/src/main/java/example/JSONUserEventExtractor.java +++ b/docs/example/src/main/java/example/JSONUserEventExtractor.java @@ -23,6 +23,7 @@ import com.linecorp.decaton.processor.TaskMetadata; +import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.TaskExtractor; import example.models.UserEvent; @@ -31,9 +32,9 @@ public class JSONUserEventExtractor implements TaskExtractor { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override - public DecatonTask extract(byte[] bytes) { + public DecatonTask extract(ConsumedRecord record) { try { - UserEvent event = MAPPER.readValue(bytes, UserEvent.class); + UserEvent event = MAPPER.readValue(record.value(), UserEvent.class); TaskMetadata metadata = TaskMetadata.builder() // Filling timestampMillis is not mandatory, but it would be useful // when you monitor delivery latency between event production time and event processing time. @@ -43,7 +44,7 @@ public DecatonTask extract(byte[] bytes) { // You can set other TaskMetadata fields as you needed .build(); - return new DecatonTask<>(metadata, event, bytes); + return new DecatonTask<>(metadata, event, record.value()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/docs/example/src/main/java/example/TaskBatchingMain.java b/docs/example/src/main/java/example/TaskBatchingMain.java index 197faf80..1b44919e 100644 --- a/docs/example/src/main/java/example/TaskBatchingMain.java +++ b/docs/example/src/main/java/example/TaskBatchingMain.java @@ -43,16 +43,16 @@ public static void main(String[] args) throws Exception { consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor"); - TaskExtractor extractor = bytes -> { + TaskExtractor extractor = record -> { TaskMetadata metadata = TaskMetadata.builder().build(); HelloTask data; try { - data = new ObjectMapper().readValue(bytes, HelloTask.class); + data = new ObjectMapper().readValue(record.value(), HelloTask.class); } catch (IOException e) { throw new RuntimeException(e); } - return new DecatonTask<>(metadata, data, bytes); + return new DecatonTask<>(metadata, data, record.value()); }; long lingerMillis = 1000; int capacity = 100; diff --git a/docs/example/src/main/java/example/TaskCompactionMain.java b/docs/example/src/main/java/example/TaskCompactionMain.java index c4f0499d..f71a822f 100644 --- a/docs/example/src/main/java/example/TaskCompactionMain.java +++ b/docs/example/src/main/java/example/TaskCompactionMain.java @@ -44,16 +44,16 @@ public static void main(String[] args) throws Exception { System.getProperty("bootstrap.servers")); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor"); - TaskExtractor extractor = bytes -> { + TaskExtractor extractor = record -> { TaskMetadata metadata = TaskMetadata.builder().build(); LocationEvent data; try { - data = new ObjectMapper().readValue(bytes, LocationEvent.class); + data = new ObjectMapper().readValue(record.value(), LocationEvent.class); } catch (IOException e) { throw new RuntimeException(e); } - return new DecatonTask<>(metadata, data, bytes); + return new DecatonTask<>(metadata, data, record.value()); }; ProcessorSubscription subscription = diff --git a/docs/getting-started.adoc b/docs/getting-started.adoc index ca19ab88..9db9029a 100644 --- a/docs/getting-started.adoc +++ b/docs/getting-started.adoc @@ -1,6 +1,6 @@ Getting Started Decaton ======================= -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: common,client,processor,protobuf Let's start from the most basic usage of Decaton client/processor. diff --git a/docs/key-blocking.adoc b/docs/key-blocking.adoc index 204bcaaa..9904d90e 100644 --- a/docs/key-blocking.adoc +++ b/docs/key-blocking.adoc @@ -1,5 +1,5 @@ = Key Blocking -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor == Introduction diff --git a/docs/monitoring.adoc b/docs/monitoring.adoc index 82f19e36..8c920f24 100644 --- a/docs/monitoring.adoc +++ b/docs/monitoring.adoc @@ -1,6 +1,6 @@ Monitoring Decaton ================== -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor This document guides you how to monitor your Decaton processor applications. diff --git a/docs/rate-limiting.adoc b/docs/rate-limiting.adoc index 75588d73..1f17470b 100644 --- a/docs/rate-limiting.adoc +++ b/docs/rate-limiting.adoc @@ -1,5 +1,5 @@ = Rate Limiting -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor == Introduction diff --git a/docs/retry-queueing.adoc b/docs/retry-queueing.adoc index 01dcd694..77cf86b0 100644 --- a/docs/retry-queueing.adoc +++ b/docs/retry-queueing.adoc @@ -1,5 +1,5 @@ = Retry Queuing -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor == Introduction diff --git a/docs/runtime.adoc b/docs/runtime.adoc index 6b52230e..90158d30 100644 --- a/docs/runtime.adoc +++ b/docs/runtime.adoc @@ -1,5 +1,5 @@ = Subpartition Runtime -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor This document guides you what Subpartition Runtime is and how to use it. diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 514fa561..aa509978 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -1,5 +1,5 @@ = Task Batching -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor == Introduction diff --git a/docs/task-compaction.adoc b/docs/task-compaction.adoc index 5dfd7f64..0a7a308c 100644 --- a/docs/task-compaction.adoc +++ b/docs/task-compaction.adoc @@ -1,5 +1,5 @@ = Task Compaction -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor == Introduction diff --git a/docs/tracing.adoc b/docs/tracing.adoc index a8c0630a..adf60c08 100644 --- a/docs/tracing.adoc +++ b/docs/tracing.adoc @@ -1,5 +1,5 @@ = Tracing -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: brave,processor Decaton can integrate with distributed tracing frameworks so that you can associate the processing of a message diff --git a/docs/why-decaton.adoc b/docs/why-decaton.adoc index 17637a86..0ada28fa 100644 --- a/docs/why-decaton.adoc +++ b/docs/why-decaton.adoc @@ -1,6 +1,6 @@ Why Decaton =========== -:base_version: 8.0.0 +:base_version: 9.0.0 :modules: processor This document explains why we have decided to create a new consumer framework.