Skip to content

Commit

Permalink
Update docs and examples for 9.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ocadaruma committed Aug 14, 2024
1 parent 5ef1bea commit fbd8e03
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 30 deletions.
30 changes: 20 additions & 10 deletions docs/consuming-any-data.adoc → docs/consuming-any-topic.adoc
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
Consuming Arbitrary Topic
=========================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: common,protocol,processor

This document guides you how to consume and process topics containing records not consists of Decaton's protocol (not produced by DecatonClient) using Decaton processors.
This document covers an advanced usage of Decaton processor when you process topics containing records not produced by DecatonClient.

By default, Decaton assumes messages are serialized as `DecatonTaskRequest` defined in link:../protocol/src/main/proto/decaton.proto[decaton.proto], and `DecatonProcessor` extracts the task from its `serialized_task` field.
But Decaton has the capability to consume arbitrary topics other than topics consisting records of Decaton protocol.
[NOTE]
====
* From Decaton 9.0.0, DecatonClient no longer wraps tasks with DecatonTaskRequest protobuf message.
** The wrapper was a heritage from the time when Kafka didn't support record headers to store task metadata.
** Now task metadata is stored in record headers instead, so there are no differences in wire format between tasks produced by DecatonClient and by other clients.
* Hence, you can just use link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder#consuming(String topic, Deserializer<T> deserializer)] to process arbitrary topics in most cases.
** So the expected use cases of this guide are:
*** You need to apply custom task metadata extraction logic. (e.g. Set `scheduledTimeMillis` for delayed processing)
*** You need to access additional information (e.g. record headers) for deserialization
====

By default, Decaton assumes messages are produced by DecatonClient where task metadata is stored as link:../protocol/src/main/proto/decaton.proto[TaskMetadataProto] in record headers.

This means you can use Decaton as a drop-in replacement for a vanilla KafkaConsumer to leverage powerful features like deferred completion, delayed processing and so on.
When you consume a topic not produced by DecatonClient, you can apply custom task metadata extraction logic.

Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to consume arbitrary formats other than JSON.
Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to apply to arbitrary formats other than JSON.

== TaskExtractor

First, you need to start by implementing your own link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java[TaskExtractor] to deserialize a task from raw message bytes.
First, you need to start by implementing your own link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java[TaskExtractor] to extract a task from raw consumed messages.

[source,java]
.JSONUserEventExtractor.java
Expand All @@ -23,9 +33,9 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public DecatonTask<UserEvent> extract(byte[] bytes) {
public DecatonTask<UserEvent> extract(ConsumedRecord record) {
try {
UserEvent event = MAPPER.readValue(bytes, UserEvent.class);
UserEvent event = MAPPER.readValue(record.value(), UserEvent.class);
TaskMetadata metadata = TaskMetadata.builder()
// Filling timestampMillis is not mandatory, but it would be useful
// when you monitor delivery latency between event production time and event processing time.
Expand All @@ -35,7 +45,7 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
// You can set other TaskMetadata fields as you needed
.build();
return new DecatonTask<>(metadata, event, bytes);
return new DecatonTask<>(metadata, event, record.value());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion docs/dynamic-property-configuration.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Dynamic Property Configuration
=============================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: centraldogma,processor

== Property Supplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.linecorp.decaton.processor.TaskMetadata;

import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import example.models.UserEvent;
Expand All @@ -31,9 +32,9 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public DecatonTask<UserEvent> extract(byte[] bytes) {
public DecatonTask<UserEvent> extract(ConsumedRecord record) {
try {
UserEvent event = MAPPER.readValue(bytes, UserEvent.class);
UserEvent event = MAPPER.readValue(record.value(), UserEvent.class);
TaskMetadata metadata = TaskMetadata.builder()
// Filling timestampMillis is not mandatory, but it would be useful
// when you monitor delivery latency between event production time and event processing time.
Expand All @@ -43,7 +44,7 @@ public DecatonTask<UserEvent> extract(byte[] bytes) {
// You can set other TaskMetadata fields as you needed
.build();

return new DecatonTask<>(metadata, event, bytes);
return new DecatonTask<>(metadata, event, record.value());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
6 changes: 3 additions & 3 deletions docs/example/src/main/java/example/TaskBatchingMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ public static void main(String[] args) throws Exception {
consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");

TaskExtractor<HelloTask> extractor = bytes -> {
TaskExtractor<HelloTask> extractor = record -> {
TaskMetadata metadata = TaskMetadata.builder().build();
HelloTask data;
try {
data = new ObjectMapper().readValue(bytes, HelloTask.class);
data = new ObjectMapper().readValue(record.value(), HelloTask.class);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new DecatonTask<>(metadata, data, bytes);
return new DecatonTask<>(metadata, data, record.value());
};
long lingerMillis = 1000;
int capacity = 100;
Expand Down
6 changes: 3 additions & 3 deletions docs/example/src/main/java/example/TaskCompactionMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ public static void main(String[] args) throws Exception {
System.getProperty("bootstrap.servers"));
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");

TaskExtractor<LocationEvent> extractor = bytes -> {
TaskExtractor<LocationEvent> extractor = record -> {
TaskMetadata metadata = TaskMetadata.builder().build();
LocationEvent data;
try {
data = new ObjectMapper().readValue(bytes, LocationEvent.class);
data = new ObjectMapper().readValue(record.value(), LocationEvent.class);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new DecatonTask<>(metadata, data, bytes);
return new DecatonTask<>(metadata, data, record.value());
};

ProcessorSubscription subscription =
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Getting Started Decaton
=======================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: common,client,processor,protobuf

Let's start from the most basic usage of Decaton client/processor.
Expand Down
2 changes: 1 addition & 1 deletion docs/key-blocking.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Key Blocking
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/monitoring.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Monitoring Decaton
==================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

This document guides you how to monitor your Decaton processor applications.
Expand Down
2 changes: 1 addition & 1 deletion docs/rate-limiting.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Rate Limiting
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/retry-queueing.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Retry Queuing
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/runtime.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Subpartition Runtime
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

This document guides you what Subpartition Runtime is and how to use it.
Expand Down
2 changes: 1 addition & 1 deletion docs/task-batching.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Task Batching
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/task-compaction.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Task Compaction
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/tracing.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Tracing
:base_version: 8.0.0
:base_version: 9.0.0
:modules: brave,processor

Decaton can integrate with distributed tracing frameworks so that you can associate the processing of a message
Expand Down
2 changes: 1 addition & 1 deletion docs/why-decaton.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Why Decaton
===========
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

This document explains why we have decided to create a new consumer framework.
Expand Down

0 comments on commit fbd8e03

Please sign in to comment.