Skip to content

Commit

Permalink
[hotfix][docs] Fix missing forward options and improve the doc
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Jan 26, 2024
1 parent b09928d commit 0d10819
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 25 deletions.
80 changes: 57 additions & 23 deletions docs/content.zh/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ CREATE TABLE KafkaTable (
<tr>
<th class="text-left" style="width: 25%">参数</th>
<th class="text-center" style="width: 8%">是否必选</th>
<th class="text-center" style="width: 8%">Forwarded</th>
<th class="text-center" style="width: 7%">默认值</th>
<th class="text-center" style="width: 10%">数据类型</th>
<th class="text-center" style="width: 50%">描述</th>
Expand All @@ -185,41 +186,47 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>connector</h5></td>
<td>必选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>指定使用的连接器,Kafka 连接器使用 <code>'kafka'</code>。</td>
</tr>
<tr>
<td><h5>topic</h5></td>
<td>required for sink</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 <code>'topic-1;topic-2'</code>。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。</td>
</tr>
<tr>
<td><h5>topic-pattern</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。</td>
</tr>
<tr>
<td><h5>properties.bootstrap.servers</h5></td>
<td>必选</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>逗号分隔的 Kafka broker 列表。</td>
</tr>
<tr>
<td><h5>properties.group.id</h5></td>
<td>对 source 可选,不适用于 sink</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 "KafkaSource-{tableIdentifier}" 作为消费组 ID。</td>
</tr>
<tr>
<td><h5>properties.*</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>
Expand All @@ -229,6 +236,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>format</h5></td>
<td>必选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>用来序列化或反序列化 Kafka 消息的格式。
Expand All @@ -239,6 +247,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>key.format</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>用来序列化和反序列化 Kafka 消息键(Key)的格式。
Expand All @@ -250,6 +259,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>key.fields</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">[]</td>
<td>List&lt;String&gt;</td>
<td>表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。
Expand All @@ -259,6 +269,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>key.fields-prefix</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。
Expand All @@ -270,6 +281,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>value.format</h5></td>
<td>必选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>序列化和反序列化 Kafka 消息体时使用的格式。
Expand All @@ -280,6 +292,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>value.fields-include</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">ALL</td>
<td><p>枚举类型</p> 可选值:[ALL, EXCEPT_KEY]</td>
<td>定义消息体(Value)格式如何处理消息键(Key)字段的策略。
Expand All @@ -289,6 +302,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>scan.startup.mode</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">group-offsets</td>
<td>Enum</td>
<td>Kafka consumer 的启动模式。有效值为:<code>'earliest-offset'</code>,<code>'latest-offset'</code>,<code>'group-offsets'</code>,<code>'timestamp'</code> 和 <code>'specific-offsets'</code>。
Expand All @@ -297,6 +311,7 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>scan.startup.specific-offsets</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>在使用 <code>'specific-offsets'</code> 启动模式时为每个 partition 指定 offset,例如 <code>'partition:0,offset:42;partition:1,offset:300'</code>。
Expand All @@ -305,46 +320,47 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>scan.startup.timestamp-millis</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>Long</td>
<td>在使用 <code>'timestamp'</code> 启动模式时指定启动的时间戳(单位毫秒)。</td>
</tr>
<tr>
<td><h5>scan.bounded.mode</h5></td>
<td>optional</td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">unbounded</td>
<td>Enum</td>
<td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td>
<td>Kafka有界消费模式,支持设置为<code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>。更多详情,请参阅下面的 "有界结束位置"。</td>
</tr>
<tr>
<td><h5>scan.bounded.specific-offsets</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>可选</td>
<td></td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset
for a partition is not provided it will not consume from that partition.</code>.
</td>
<td>在<code>'specific-offsets'</code>有界模式下,为每个partition指定offset。例如<code>'partition:0,offset:42;partition:1,offset:300'</code>。如果没有提供分区offset,则不会从该分区消耗数据。</td>
</tr>
<tr>
<td><h5>scan.bounded.timestamp-millis</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>可选</td>
<td></td>
<td style="word-wrap: break-word;">(无)</td>
<td>Long</td>
<td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td>
<td><code>'timestamp'</code>有界模式下,用于指定的时间戳(毫秒)结束。</td>
</tr>
<tr>
<td><h5>scan.topic-partition-discovery.interval</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">5分钟</td>
<td>Duration</td>
<td>Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">'default'</td>
<td>String</td>
<td>Flink partition 到 Kafka partition 的分区映射关系,可选值有:
Expand All @@ -360,13 +376,31 @@ CREATE TABLE KafkaTable (
<tr>
<td><h5>sink.semantic</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td>String</td>
<td>已弃用: 请使用<code>sink.delivery-guarantee</code>。</td>
</tr>
<tr>
<td><h5>sink.delivery-guarantee</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td>String</td>
<td>定义 Kafka sink 的语义。有效值为 <code>'at-least-once'</code>,<code>'exactly-once'</code> 和 <code>'none'</code>。请参阅 <a href='#一致性保证'>一致性保证</a> 以获取更多细节。</td>
</tr>
<tr>
<td><h5>sink.transactional-id-prefix</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>如果<code>'sink.delivery-guarantee'</code>设置为<code>'exactly-once'</code>,则必须设置此值。此选项将会作为所有已打开kafka事务标识符的前缀。</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">(无)</td>
<td>Integer</td>
<td>定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。</td>
Expand Down Expand Up @@ -511,22 +545,22 @@ ROW<`version` INT, `behavior` STRING>
如果使用了 `specific-offsets`,必须使用另外一个配置项 `scan.startup.specific-offsets` 来为每个 partition 指定起始偏移量,
例如,选项值 `partition:0,offset:42;partition:1,offset:300` 表示 partition `0` 从偏移量 `42` 开始,partition `1` 从偏移量 `300` 开始。

### Bounded Ending Position
### 有界结束位点

The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are:
配置`scan.bounded.mode`指定Kafka的消费为有界模式。有效值为:
<ul>
<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li>
<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li>
<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li>
<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li>
<li><span markdown="span">`group-offsets`</span>: 从特定消费者组的Zookeeper / Kafka broker中提交的偏移量作为消费边界。偏移量将会在开始消费一个分区时获取。</li>
<li><span markdown="span">`latest-offset`</span>: 以最末尾偏移量作为消费边界。偏移量将会在开始消费一个分区时获取。</li>
<li><span markdown="span">`timestamp`</span>: 以用户指定的时间戳作为消费边界。</li>
<li><span markdown="span">`specific-offsets`</span>: 以用户为每个 partition 指定的偏移量作为消费边界。</li>
</ul>

If config option value `scan.bounded.mode` is not set the default is an unbounded table.
如果未设置配置选项值 `scan.bounded.mode` ,则默认为无界表。

If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
如果使用了 `timestamp`,必须使用另外一个配置项 `scan.bounded.timestamp-millis` 来指定一个从格林尼治标准时间 1970 年 1 月 1 日 00:00:00.000 开始计算的毫秒单位时间戳作为消费边界。

If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition,
e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition.
如果使用了 `specific-offsets`,必须使用另外一个配置项 `scan.bounded.specific-offsets` 来为每个 partition 指定结束偏移量,
例如,选项值 `partition:0,offset:42;partition:1,offset:300` 表示 partition `0` 从偏移量 `42` 结束,partition `1` 从偏移量 `300` 结束。如果未提供分区的偏移量,则不会从该分区消耗数据。

### CDC 变更日志(Changelog) Source

Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ Connector Options
<tr>
<td><h5>scan.bounded.mode</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">unbounded</td>
<td>Enum</td>
<td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ public Set<ConfigOption<?>> forwardOptions() {
SCAN_TOPIC_PARTITION_DISCOVERY,
SCAN_STARTUP_TIMESTAMP_MILLIS,
SINK_PARTITIONER,
SINK_PARALLELISM,
TRANSACTIONAL_ID_PREFIX)
TRANSACTIONAL_ID_PREFIX,
SCAN_BOUNDED_MODE,
SCAN_BOUNDED_SPECIFIC_OFFSETS,
SCAN_BOUNDED_TIMESTAMP_MILLIS)
.collect(Collectors.toSet());
}

Expand Down

0 comments on commit 0d10819

Please sign in to comment.