• 欢迎光临~

FlinkSQL的join运算

开发技术 开发技术 2022-01-23 74次浏览

1. 背景

FlinkSQL在各个大厂实践地火热,咱也不能落后,搞起。

2. join类型 - 来自官网

Flink SQL supports complex and flexible join operations over dynamic tables. There are several different types of joins to account for the wide variety of semantics queries may require.
Flink SQL 支持对动态表进行复杂灵活的连接操作。 有几种不同类型的连接来解决可能需要的各种语义查询。

By default, the order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. You can tweak the performance of your join queries, by listing the tables with the lowest update frequency first and the tables with the highest update frequency last. Make sure to specify tables in an order that does not yield a cross join (Cartesian product), which are not supported and would cause a query to fail.
默认情况下,连接顺序未优化。 表按照在 FROM 子句中指定的顺序连接。 您可以调整联接查询的性能,首先列出更新频率最低的表,最后列出更新频率最高的表。 确保以不产生交叉连接(笛卡尔积)的顺序指定表,交叉连接不受支持,会导致查询失败。

2.1 Regular Joins 常规join

Regular joins are the most generic type of join in which any new record, or changes to either side of the join, are visible and affect the entirety of the join result. For example, if there is a new record on the left side, it will be joined with all the previous and future records on the right side when the product id equals.
常规联接是最通用的联接类型,其中任何新记录或对联接任一侧的更改都是可见的,并且会影响整个联接结果。 例如,如果左侧有一条新记录,则当产品 id 相等时,它将与右侧的所有先前和将来的记录连接。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
For streaming queries, the grammar of regular joins is the most flexible and allow for any kind of updating (insert, update, delete) input table. However, this operation has important operational implications: it requires to keep both sides of the join input in Flink state forever. Thus, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.
对于流式查询,常规连接的语法是最灵活的,并且允许任何类型的更新(插入、更新、删除)输入表。 但是,此操作具有重要的操作含义:它需要将连接输入的双方永远保持在 Flink 状态。 因此,计算查询结果所需的状态可能会无限增长,具体取决于所有输入表的不同输入行数和中间连接结果。 您可以提供具有适当状态生存时间 (TTL) 的查询配置,以防止状态大小过大。 请注意,这可能会影响查询结果的正确性。 有关详细信息,请参阅查询配置。

For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.
对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。 请提供具有有效保留间隔的查询配置,以防止状态大小过大。 有关详细信息,请参阅查询配置。
2.1.1 INNER Equi-JOIN
Returns a simple Cartesian product restricted by the join condition. Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.
返回受连接条件限制的简单笛卡尔积。 目前,仅支持等值连接,即具有至少一个具有等式谓词的合取条件的连接。 不支持任意交叉或 theta 连接。
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id
2.1.2 OUTER Equi-JOIN
Returns all rows in the qualified Cartesian product (i.e., all combined rows that pass its join condition), plus one copy of each row in an outer table for which the join condition did not match with any row of the other table. Flink supports LEFT, RIGHT, and FULL outer joins. Currently, only equi-joins are supported, i.e., joins with at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.
返回合格笛卡尔积中的所有行(即所有通过其连接条件的组合行),加上外部表中连接条件与其他表的任何行都不匹配的每一行的一个副本。 Flink 支持 LEFT、RIGHT 和 FULL 外连接。 目前,仅支持等值连接,即,与至少一个具有等式谓词的合取条件连接。 不支持任意交叉或 theta 连接。
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

2.2 Interval Joins 区间/间隔join

Returns a simple Cartesian product restricted by the join condition and a time constraint. An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Two appropriate range predicates can define such a condition (<, <=, >=, >), a BETWEEN predicate, or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.
返回受连接条件和时间约束限制的简单笛卡尔积。 间隔连接至少需要一个等连接谓词和一个限制双方时间的连接条件。 两个适当的范围谓词可以定义这样的条件(<、<=、>=、>)、BETWEEN 谓词或比较两个输入的相同类型的时间属性(即处理时间或事件时间)的单个等式谓词 表。

For example, this query will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
例如,如果订单在收到订单四小时后发货,则此查询将连接所有订单及其相应的发货。
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
The following predicates are examples of valid interval join conditions:
  ltime = rtime
  ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
For streaming queries, compared to the regular join, interval join only supports append-only tables with time attributes. Since time attributes are quasi-monotonic increasing, Flink can remove old values from its state without affecting the correctness of the result.
对于流式查询,与常规联接相比,间隔联接仅支持具有时间属性的append-only表。 由于时间属性是准单调递增的,因此 Flink 可以从其状态中移除旧值而不影响结果的正确性。

2.3 Temporal Joins 时态join

A Temporal table is a table that evolves over time - otherwise known in Flink as a dynamic table. Rows in a temporal table are associated with one or more temporal periods and all Flink tables are temporal(dynamic). The temporal table contains one or more versioned table snapshots, it can be a changing history table which tracks the changes(e.g. database changelog, contains all snapshots) or a changing dimensioned table which materializes the changes(e.g. database table which contains the latest snapshot).
时态表是随时间演变的表——在 Flink 中也称为动态表。 时态表中的行与一个或多个时态周期相关联,并且所有 Flink 表都是时态的(动态的)。 时态表包含一个或多个版本化的表快照,它可以是跟踪更改的更改历史表(例如数据库更改日志,包含所有快照)或实现更改的更改维度表(例如包含最新快照的数据库表) .
2.3.1 Event Time Temporal Join
Event Time temporal joins allow joining against a versioned table. This means a table can be enriched with changing metadata and retrieve its value at a certain point in time.
事件时间Temporal join允许对版本化表进行连接。 这意味着可以通过更改元数据来丰富表,并在某个时间点检索其值。

Temporal joins take an arbitrary table (left input/probe site) and correlate each row to the corresponding row’s relevant version in the versioned table (right input/build side). Flink uses the SQL syntax of FOR SYSTEM_TIME AS OF to perform this operation from the SQL:2011 standard. The syntax of a temporal join is as follows:
Temporal join采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 FOR SYSTEM_TIME AS OF 的 SQL 语法从 SQL:2011 标准执行此操作。 时间连接的语法如下:
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
With an event-time attribute (i.e., a rowtime attribute), it is possible to retrieve the value of a key as it was at some point in the past. This allows for joining the two tables at a common point in time. The versioned table will store all versions - identified by time - since the last watermark.
使用事件时间属性(即行时间属性),可以检索过去某个时间点的键值。 这允许在一个共同的时间点连接两个表。 版本化表将存储自上次水印以来的所有版本(按时间标识)。

For example, suppose we have a table of orders, each with prices in different currencies. To properly normalize this table to a single currency, such as USD, each order needs to be joined with the proper currency conversion rate from the point-in-time when the order was placed.
例如,假设我们有一个订单表,每个订单都有不同货币的价格。 为了将该表正确规范化为单一货币,例如美元,每个订单都需要与下订单时的正确货币兑换率相连接。
-- Create a table of orders. This is a standard
-- append-only dynamic table.
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- Define a versioned table of currency rates. 
-- This could be from a change-data-capture
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table. 
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

order_id  price  currency  conversion_rate  order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00
Note: The event-time temporal join is triggered by a watermark from the left and right sides; please ensure both sides of the join have set watermark correctly.
注意:event-time temporal join 是由左右两侧的 watermark 触发的; 请确保连接的两边都正确设置了水印。

Note: The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition, e.g., The primary key currency_rates.currency of table currency_rates to be constrained in the condition orders.currency = currency_rates.currency.
注意:event-time temporal join需要temporal join条件的等价条件中包含的主键,例如,表currency_rates的主键currency_rates.currency要约束在条件orders.currency = currency_rates.currency中。
In contrast to regular joins, the previous temporal table results will not be affected despite the changes on the build side. Compared to interval joins, temporal table joins do not define a time window within which the records will be joined. Records from the probe side are always joined with the build side’s version at the time specified by the time attribute. Thus, rows on the build side might be arbitrarily old. As time passes, no longer needed versions of the record (for the given primary key) will be removed from the state.
与常规连接相比,尽管构建端发生了变化,但之前的时态表结果不会受到影响。 与间隔连接相比,时态表连接不定义记录将在其中连接的时间窗口。 来自探测端的记录总是在时间属性指定的时间与构建端的版本连接。 因此,构建端的行可能是任意旧的。 随着时间的推移,不再需要的记录版本(对于给定的主键)将从状态中删除。
2.3.2 Processing Time Temporal Join
A processing time temporal table join uses a processing-time attribute to correlate rows to the latest version of a key in an external versioned table.
处理时间时态表连接使用处理时间属性将行与外部版本化表中键的最新版本相关联。

By definition, with a processing-time attribute, the join will always return the most up-to-date value for a given key. One can think of a lookup table as a simple HashMap<K, V> that stores all the records from the build side. The power of this join is it allows Flink to work directly against external systems when it is not feasible to materialize the table as a dynamic table within Flink.
根据定义,使用处理时间属性,连接将始终返回给定键的最新值。 可以将查找表视为一个简单的 HashMap<K, V> ,它存储来自构建端的所有记录。 这种连接的强大之处在于,当无法将表具体化为 Flink 中的动态表时,它允许 Flink 直接针对外部系统工作。

The following processing-time temporal table join example shows an append-only table orders that should be joined with the table LatestRates. LatestRates is a dimension table (e.g. HBase table) that is materialized with the latest rate. At time 10:15, 10:30, 10:52, the content of LatestRates looks as follows:
以下processing-time temporal join示例显示了应与表 LatestRates 联接的append-only表订单。 LatestRates 是一个以最新速率具体化的维度表(例如 HBase 表)。在时间 10:15、10:30、10:52,LatestRates 的内容如下所示:
10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1
The content of LastestRates at times 10:15 and 10:30 are equal. The Euro rate has changed from 114 to 116 at 10:52.
LastestRates 在时间 10:15 和 10:30 的内容是相等的。 欧元汇率在 10:52 从 114 变为 116。

Orders is an append-only table representing payments for the given amount and the given currency. For example, at 10:15 there was an order for an amount of 2 Euro.
Orders 是一个append-only表,表示给定金额和给定货币的付款。 例如,在 10:15,有一个金额为 2 欧元的订单。
SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52
Given these tables, we would like to calculate all Orders converted to a common currency.
amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52
Currently, the FOR SYSTEM_TIME AS OF syntax used in temporal join with latest version of any view/table is not support yet, you can use temporal table function syntax as following:
目前,暂时不支持与任何视图/表的最新版本进行temporal join的 FOR SYSTEM_TIME AS OF 语法,您可以使用temporal table函数语法,如下所示:
SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
Note The reason why the FOR SYSTEM_TIME AS OF syntax used in temporal join with latest version of any table/view is not support is only the semantic consideration, because the join processing for left stream doesn’t wait for the complete snapshot of temporal table, this may mislead users in production environment. The processing-time temporal join by temporal table function also exists same semantic problem, but it has been alive for a long time, thus we support it from the perspective of compatibility.
注意 不支持与任何表/视图的最新版本的temporal join中使用 FOR SYSTEM_TIME AS OF 语法的原因只是语义考虑,因为左流的连接处理不等待temporal table的完整快照, 这可能会误导生产环境中的用户。 temporal table function的processing-time temporal join也存在同样的语义问题,但它已经存在了很长时间,因此我们从兼容性的角度来支持它。

The result is not deterministic for processing-time. The processing-time temporal join is most often used to enrich the stream with an external table (i.e., dimension table).
结果对于processing-time是不确定的。 processing-time temporal join最常用于通过外部表(即维度表)丰富流。

In contrast to regular joins, the previous temporal table results will not be affected despite the changes on the build side. Compared to interval joins, temporal table joins do not define a time window within which the records join, i.e., old rows are not stored in state.
与常规连接相比,尽管构建端发生了变化,但之前的temporal table结果不会受到影响。 与区间连接相比,temporal table joins没有定义记录连接的时间窗口,即旧行不存储在状态中。
2.3.3 Temporal Table Function Join
The syntax to join a table with a [temporal table function](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/temporal_table_function/) is the same as in Join with [Table Function](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#table-function).
使用temporal table function连接表的语法与使用Table Function连接的语法相同。

Note: Currently only inner join and left outer join with temporal tables are supported.
注意:目前仅支持带有temporal tables的内连接和左外连接。

Assuming Rates is a temporal table function, the join can be expressed in SQL as follows:
假设 Rates 是一个temporal table function,则连接可以用 SQL 表示如下:
SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
The main difference between above Temporal Table DDL and Temporal Table Function are:
上述 Temporal Table DDL 和 Temporal Table Function 的主要区别是:

The temporal table DDL can be defined in SQL but temporal table function can not;
temporal table DDL 可以在 SQL 中定义,但temporal table function 不能;

Both temporal table DDL and temporal table function support temporal join versioned table, but only temporal table function can temporal join the latest version of any table/view.
temporal table DDL 和 temporal table function 都支持 temporal join versioned table,但只有 temporal table function 可以 temporal join 任何 table/view 的最新版本。

2.4 Lookup Join

A lookup join is typically used to enrich a table with data that is queried from an external system. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.
lookup join通常用于使用从外部系统查询的数据来丰富表。 联接要求一个表具有处理时间属性,而另一个表由查找源连接器支持。

The lookup join uses the above Processing Time Temporal Join syntax with the right table to be backed by a lookup source connector.
lookup join使用上面的处理时间Temporal Join语法和由查找源连接器支持的正确表。

The following example shows the syntax to specify a lookup join.
以下示例显示了指定查找联接的语法。
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;
In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The FOR SYSTEM_TIME AS OF clause with the subsequent processing time attribute ensures that each row of the Orders table is joined with those Customers rows that match the join predicate at the point in time when the Orders row is processed by the join operator. It also prevents that the join result is updated when a joined Customer row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.customer_id = c.id.
在上面的示例中,Orders 表中包含来自 MySQL 数据库中的 Customers 表的数据。 带有后续处理时间属性的 FOR SYSTEM_TIME AS OF 子句确保 Orders 表的每一行在连接运算符处理 Orders 行的时间点与那些匹配连接谓词的客户行连接。 它还可以防止在将来更新连接的客户行时更新连接结果。 查找连接还需要一个强制相等连接谓词,在上面的示例中为 o.customer_id = c.id。

2.5 Table Function

Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. [User-defined table functions](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/udfs/#table-functions) must be registered before use.
将表与表函数的结果连接起来。 左(外)表的每一行都与表函数的相应调用产生的所有行连接。 用户定义的表函数必须在使用前注册。
2.5.1 INNER JOIN

The row of the left (outer) table is dropped, if its table function call returns an empty result.
如果其表函数调用返回空结果,则删除左(外)表的行。

SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)
2.5.2 LEFT OUTER JOIN

If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause.
如果表函数调用返回空结果,则保留相应的外部行,并用空值填充结果。 目前,针对横向表的左外连接需要 ON 子句中的 TRUE 文字。

SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
  ON TRUE
程序员灯塔
转载请注明原文链接:FlinkSQL的join运算
喜欢 (0)
违法和不良信息举报电话:022-22558618 举报邮箱:dljd@tidljd.com