diff --git a/docs/content.zh/docs/dev/table/tuning.md b/docs/content.zh/docs/dev/table/tuning.md index 0fe999a57bd5d..7166f44957280 100644 --- a/docs/content.zh/docs/dev/table/tuning.md +++ b/docs/content.zh/docs/dev/table/tuning.md @@ -29,7 +29,7 @@ under the License. SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Table API 和 SQL 是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。 -在这一页,我们将介绍一些实用的优化选项以及流式聚合的内部原理,它们在某些情况下能带来很大的提升。 +在这一页,我们将介绍一些实用的优化选项以及流式聚合和普通连接的内部原理,它们在某些情况下能带来很大的提升。 {{< hint info >}} 目前 [分组聚合] ({{< ref "docs/dev/table/sql/queries/group-agg" >}}) 和 [窗口表值函数聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) (会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。 @@ -259,5 +259,33 @@ GROUP BY day Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 `user_id` 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。 +## MiniBatch Regular Joins + +默认情况下,regular join 算子是逐条处理输入的记录,即:(1)根据当前输入记录的 join key 关联对方状态中的记录,(2)根据当前记录写入或者撤回状态中的记录,(3)根据当前的输入记录和关联到的记录输出结果。 +这种处理模式可能会增加 StateBackend 的开销(尤其是对于 RocksDB StateBackend )。除此之外,这会导致严重的中间结果放大。尤其在多级级联 join 的场景,会产生很多的中间结果从而导致性能降低。 + +MiniBatch join 主要解决 regular join 存在的中间结果放大和 StateBackend 开销较大的问题。其核心思想是将一组输入的数据缓存在 join 算子内部的缓冲区中,一旦达到时间阈值或者缓存容量阈值,就触发 join 执行流程。 +这有两个主要的优化点: + +1) 在缓存中折叠数据,以此减少 join 的次数。 +2) 尽最大可能在处理数据时抑制冗余数据下发。 + +以 left join 为例子,左右流的输入都是 join key 包含 unique key 的情况。假设 `id` 为 join key 和 unique key (数字代表 `id`, 字母代表 `content`), 具体 SQL 如下: + +```sql +SET 'table.exec.mini-batch.enabled' = 'true'; +SET 'table.exec.mini-batch.allow-latency' = '5S'; +SET 'table.exec.mini-batch.size' = '5000'; + +SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content +FROM a LEFT JOIN b +ON a.id = b.id +``` + +针对上述场景,mini-batch join 算子的具体处理过程如下图所示。 + +{{< img src="/fig/table-streaming/minibatch_join.png" width="70%" height="70%" >}} + +默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和 `table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。 {{< top >}} diff --git a/docs/content/docs/dev/table/tuning.md b/docs/content/docs/dev/table/tuning.md index b78c4687c3655..d6c6c2e9f2c3e 100644 --- a/docs/content/docs/dev/table/tuning.md +++ b/docs/content/docs/dev/table/tuning.md @@ -28,7 +28,7 @@ under the License. SQL is the most widely used language for data analytics. Flink's Table API and SQL enables users to define efficient stream analytics applications in less time and effort. Moreover, Flink Table API and SQL is effectively optimized, it integrates a lot of query optimizations and tuned operator implementations. But not all of the optimizations are enabled by default, so for some workloads, it is possible to improve performance by turning on some options. -In this page, we will introduce some useful optimization options and the internals of streaming aggregation which will bring great improvement in some cases. +In this page, we will introduce some useful optimization options and the internals of streaming aggregation, regular join which will bring great improvement in some cases. {{< hint info >}} The streaming aggregation optimizations mentioned in this page are all supported for [Group Aggregations]({{< ref "docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) (except Session Window TVF Aggregation) now. @@ -266,5 +266,39 @@ GROUP BY day Flink SQL optimizer can recognize the different filter arguments on the same distinct key. For example, in the above example, all the three COUNT DISTINCT are on `user_id` column. Then Flink can use just one shared state instance instead of three state instances to reduce state access and state size. In some workloads, this can get significant performance improvements. +## MiniBatch Regular Joins + +By default, regular join operator processes input records one by one, i.e., +(1) lookup associated records from the state of counterpart based on the join key of the current input record, +(2) update the state by adding current input record or retracting it, +(3) output the join results according to the current record and associated records. +This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). +Besides, this can lead to severe record amplification, especially in cascading join scenarios, generating too many intermediate results and further leading to performance degradation. + +MiniBatch join seeks to resolve the aforementioned issues. Its core idea is to cache a bundle of inputs in a buffer inside of the mini-batch join operator. +Once the buffer reaches a specified size or time threshold, the records are forwarded to the join process. +There are two core optimizations: + +1) fold records in the buffer to reduce the number of data before join process. +2) try best to suppress outputting redundant results when the records in buffer are being processed. + +For example, consider following SQL: + +```sql +SET 'table.exec.mini-batch.enabled' = 'true'; +SET 'table.exec.mini-batch.allow-latency' = '5S'; +SET 'table.exec.mini-batch.size' = '5000'; + +SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content +FROM a LEFT JOIN b +ON a.id = b.id +``` + +Both the left and right input side have unique key contained by join key which is `id` (assuming the number represents `id`, and letter represents the `content`). +The execution of mini-batch join operator are as shown in the figure below. + +{{< img src="/fig/table-streaming/minibatch_join.png" width="70%" height="70%" >}} + +MiniBatch optimization is disabled by default for regular join. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. {{< top >}} diff --git a/docs/static/fig/table-streaming/minibatch_join.png b/docs/static/fig/table-streaming/minibatch_join.png new file mode 100644 index 0000000000000..e4a60d6428dab Binary files /dev/null and b/docs/static/fig/table-streaming/minibatch_join.png differ