新聞中心
SQL 的時間語義

hello,我是老羊,今天跟著老羊的思路學習 Flink SQL 的時間語義:
- 與離線處理中常見的時間分區(qū)字段一樣,在實時處理中,時間屬性也是一個核心概念。Flink 支持 處理時間、事件時間、攝入時間 三種時間語義。
- 下文會分別介紹三種時間語義的應用場景及案例。三種時間在生產(chǎn)環(huán)境的使用頻次 事件時間(SQL 常用) > 處理時間(SQL 幾乎不用,DataStream 少用) > 攝入時間(不用)
一、Flink 三種時間屬性簡介
time
- 事件時間:指的是數(shù)據(jù)本身攜帶的時間,這個時間是在事件產(chǎn)生時的時間,而且在 Flink SQL 觸發(fā)計算時,也使用數(shù)據(jù)本身攜帶的時間。這就叫做 事件時間。目前生產(chǎn)環(huán)境中用的最多。
- 處理時間:指的是具體算子計算數(shù)據(jù)執(zhí)行時的機器時間(例如在算子中 Java 取 System.currentTimeMillis()) ),在生產(chǎn)環(huán)境中用的次多。
- 攝入時間:指的是數(shù)據(jù)從數(shù)據(jù)源進入 Flink 的時間。攝入時間用的最少,可以說基本不使用。
小伙伴萌要注意到:
- 上述的三種時間概念不是由于有了數(shù)據(jù)而誕生的,而是有了 Flink 之后根據(jù)實際的應用場景而誕生的。以事件時間舉個例子,如果只是數(shù)據(jù)攜帶了時間,F(xiàn)link 也消費了這個數(shù)據(jù),但是在 Flink 中沒有使用數(shù)據(jù)的這個時間作為計算的觸發(fā)條件,也不能把這個 Flink 任務叫做事件時間的任務。
- 其次,要認識到,一般一個 Flink 任務只會有一個時間屬性,所以時間屬性通常認為是一個任務粒度的。舉例:我們可以說 A 任務是事件時間語義的任務,B 任務是處理時間語義的任務。當然了,一個任務也可以存在多個時間屬性。
二、Flink 三種時間屬性的應用場景
講到這里,xdm 會問,博主上面寫的 3 種時間屬性到底對我們的任務有啥影響呢?3 種時間屬性的應用場景是啥?
先說結(jié)論,在 Flink 中時間的作用:
- 主要體現(xiàn)在包含時間窗口的計算中:用于標識任務的時間進度,來判斷是否需要觸發(fā)窗口的計算。比如常用的滾動窗口、滑動窗口等都需要時間推動觸發(fā)。這些窗口的應用場景后續(xù)會詳細介紹。
- 次要體現(xiàn)在自定義時間語義的計算中:舉個例子,比如用戶可以自定義每隔 10s 的本地時間,或者消費到的數(shù)據(jù)的時間戳每增大 10s,就把計算結(jié)果輸出一次,時間在此類應用中也是一種標識任務進度的作用。
博主以 滾動窗口 的聚合任務為例來介紹一下事件時間和處理時間的對比區(qū)別。
1. 事件時間案例:還是以之前的 clicks 表拿來舉例。
tumble window
上面這個案例的窗口大小是 1 小時,需求方需要按照用戶點擊時間戳 cTime 劃分數(shù)據(jù)(劃分滾動窗口),然后計算出 count 聚合結(jié)果(這樣計算能反映出事件的真實發(fā)生時間),那么就需要把 cTime 設置為窗口的劃分時間戳,即代碼中 tumble(cTime, interval '1' hour)。
上面這種就叫做事件時間。即用數(shù)據(jù)中自帶的時間戳進行窗口的劃分(點擊操作真實的發(fā)生時間)。
后續(xù) Flink SQL 任務在運行的過程中也會實際按照 cTime 的當前時間作為一小時窗口結(jié)束觸發(fā)條件并計算一個小時窗口內(nèi)的數(shù)據(jù)。
2.處理時間案例:還是以之前的 clicks 表拿來舉例。
還是上面那個案例,但是這次需求方不需要按照數(shù)據(jù)上的時間戳劃分數(shù)據(jù)(劃分滾動窗口),只需要數(shù)據(jù)來了之后, 在 Flink 機器上的時間作為一小時窗口結(jié)束的書法條件并計算。
那么這種觸發(fā)機制就是處理時間。
3. 攝入時間案例:在 Flink 從外部數(shù)據(jù)源讀取到數(shù)據(jù)時,給這條數(shù)據(jù)帶上的當前數(shù)據(jù)源算子的本地時間戳。下游可以用這個時間戳進行窗口聚合,不過這種幾乎不使用。
三、SQL 指定時間屬性的兩種方式
如果要滿足 Flink SQL 時間窗口類的聚合操作,SQL 或 Table API 中的 數(shù)據(jù)源表 就需要提供時間屬性(相當于我們把這個時間屬性在 數(shù)據(jù)源表 上面進行聲明),以及支持時間相關的操作。
那么來看看 Flink SQL 為我們提供的兩種指定時間戳的方式:
- CREATE TABLE DDL 創(chuàng)建表的時候指定。
- 可以在 DataStream 中指定,在后續(xù)的 DataStream 轉(zhuǎn)的 Table 中使用。
一旦時間屬性定義好,它就可以像普通列一樣使用,也可以在時間相關的操作中使用。
四、SQL 事件時間案例
來看看 Flink 中如何指定事件時間。
1. CREATE TABLE DDL 指定時間戳的方式。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 使用下面這句來將 user_action_time 聲明為事件時間,并且聲明 watermark 的生成規(guī)則,即 user_action_time 減 5 秒
-- 事件時間列的字段類型必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 類型
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
從上面這條語句可以看到,如果想使用事件時間,那么我們的時間戳類型必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 類型。很多小伙伴會想到,我們的時間戳一般不都是秒或者是毫秒(BIGINT 類型)嘛,那這種情況怎么辦?
解決方案必須要有啊。如下。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 1. 這個 ts 就是常見的毫秒級別時間戳
ts BIGINT,
-- 2. 將毫秒時間戳轉(zhuǎn)換成 TIMESTAMP_LTZ 類型
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- 3. 使用下面這句來將 user_action_time 聲明為事件時間,并且聲明 watermark 的生成規(guī)則,即 user_action_time 減 5 秒
-- 事件時間列的字段類型必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 類型
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
2.DataStream 中指定事件時間。
之前介紹了 Table 和 DataStream 可以互轉(zhuǎn),那么 Flink 也提供了一個能力,就是在 Table 轉(zhuǎn)為 DataStream 時,指定時間戳字段。如下案例:
public class DataStreamSourceEventTimeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 1. 分配 watermark
DataStream r = env.addSource(new UserDefinedSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(0L)) {
@Override
public long extractTimestamp(Row element) {
return (long) element.getField("f2");
}
});
// 2. 使用 f2.rowtime 的方式將 f2 字段指為事件時間時間戳
Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");
tEnv.createTemporaryView("source_table", sourceTable);
// 3. 在 tumble window 中使用 f2
String tumbleWindowSql =
"SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
+ "FROM source_table\n"
+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
;
Table resultTable = tEnv.sqlQuery(tumbleWindowSql);
tEnv.toDataStream(resultTable, Row.class).print();
env.execute();
}
private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable {
private volatile boolean isCancel;
@Override
public void run(SourceContext sourceContext) throws Exception {
int i = 0;
while (!this.isCancel) {
sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));
Thread.sleep(10L);
i++;
}
}
@Override
public void cancel() {
this.isCancel = true;
}
@Override
public TypeInformation getProducedType() {
return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
TypeInformation.of(Long.class));
}
}
}
五、SQL 處理時間案例
來看看 Flink SQL 中如何指定處理時間。
1.CREATE TABLE DDL 指定時間戳的方式。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 使用下面這句來將 user_action_time 聲明為處理時間
user_action_time AS PROCTIME()
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
? DataStream 中指定處理時間。
public class DataStreamSourceProcessingTimeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 1. 分配 watermark
DataStream r = env.addSource(new UserDefinedSource());
// 2. 使用 proctime.proctime 的方式將 f2 字段指為處理時間時間戳
Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");
tEnv.createTemporaryView("source_table", sourceTable);
// 3. 在 tumble window 中使用 f2
String tumbleWindowSql =
"SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
+ "FROM source_table\n"
+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
;
Table resultTable = tEnv.sqlQuery(tumbleWindowSql);
tEnv.toDataStream(resultTable, Row.class).print();
env.execute();
}
private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable {
private volatile boolean isCancel;
@Override
public void run(SourceContext sourceContext) throws Exception {
int i = 0;
while (!this.isCancel) {
sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));
Thread.sleep(10L);
i++;
}
}
@Override
public void cancel() {
this.isCancel = true;
}
@Override
public TypeInformation getProducedType() {
return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
TypeInformation.of(Long.class));
}
}
}
新聞標題:Flink SQL 知其所以然:SQL 的時間語義!
當前URL:http://fisionsoft.com.cn/article/cdscjei.html


咨詢
建站咨詢
