01 引言
在前面的博客,我们学习了Flink
的TableAP和SQL
。
本文主要讲的是Flink Table
与SQL
的一些案例。
02 Flink Table&SQL 案例
2.1 案例1(DataStream SQL统计)
需求:将DataStream
注册为Table
和View
并进行SQL
统计。
代码如下:
import static org.apache.flink.table.api.Expressions.$;
/**
* 案例1:将DataStream注册为Table和View并进行SQL统计
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:07 下午
*/
public class Demo1 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream<Order> orderA = env.fromCollection(Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream<Order> orderB = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
//3.注册表
// convert DataStream to Table
Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
// register DataStream as Table
tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
//4.执行查询
System.out.println(tableA);
// union the two tables
Table resultTable = tEnv.sqlQuery(
"SELECT * FROM " + tableA + " WHERE amount > 2 " +
"UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2"
);
//5.输出结果
DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
public Long user;
public String product;
public int amount;
}
}
运行结果:
2.2 案例2(DataStream Table&SQL统计)
需求:使用SQL
和Table
两种方式对DataStream
中的单词进行统计。
示例代码如下(SQL方式):
import static org.apache.flink.table.api.Expressions.$;
/**
* 使用SQL和Table两种方式对DataStream中的单词进行统计。
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:13 下午
*/
public class Demo02 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//3.注册表
tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));
//4.执行查询
Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
//5.输出结果
//toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate
//DataStream<WC> resultDS = tEnv.toAppendStream(resultTable, WC.class);
DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}
运行结果:
示例代码如下(Table方式):
/**
* 使用SQL和Table两种方式对DataStream中的单词进行统计
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:15 下午
*/
public class Demo02Table {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//3.注册表
Table table = tEnv.fromDataStream(input);
//4.执行查询
Table resultTable = table
.groupBy($("word"))
.select($("word"), $("frequency").sum().as("frequency"))
.filter($("frequency").isEqual(2));
//5.输出结果
DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}
2.3 案例3(SQL与滚动窗口)
需求:使用Flink SQL
来统计5秒内 每个用户的订单总数、订单的最大金额、订单的最小金额,也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额。
上面的需求使用流处理的Window
的基于时间的滚动窗口就可以搞定!
编码步骤:
1、 创建环境;
2、 使用自定义函数模拟实时流数据;
3、 设置事件时间和Watermaker
;
4、 注册表;
5、 执行sql
-可以使用sql
风格或table
风格;
6、 输出结果;
7、 触发执行;
SQL方式实现:
/**
* SQL方式实现
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:19 下午
*/
public class Demo3SQL {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
private Boolean isRunning = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//3.Transformation
DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> event.getCreateTime())
);
//4.注册表
tEnv.createTemporaryView("t_order", watermakerDS,
$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
//5.执行SQL
String sql = "select " +
"userId," +
"count(*) as totalCount," +
"max(money) as maxMoney," +
"min(money) as minMoney " +
"from t_order " +
"group by userId," +
"tumble(createTime, interval '5' second)";
Table ResultTable = tEnv.sqlQuery(sql);
//6.Sink
//将SQL的执行结果转换成DataStream再打印出来
//toAppendStream → 将计算后的数据append到结果DataStream中去
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;
}
}
Table实现方式:
/**
* Table方式
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:26 下午
*/
public class Demo3Table {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
private Boolean isRunning = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//3.Transformation
DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> event.getCreateTime())
);
//4.注册表
tEnv.createTemporaryView("t_order", watermakerDS,
$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
//查看表约束
tEnv.from("t_order").printSchema();
//5.TableAPI查询
Table ResultTable = tEnv.from("t_order")
//.window(Tumble.over("5.second").on("createTime").as("tumbleWindow"))
.window(Tumble.over(lit(5).second())
.on($("createTime"))
.as("tumbleWindow"))
.groupBy($("tumbleWindow"), $("userId"))
.select(
$("userId"),
$("userId").count().as("totalCount"),
$("money").max().as("maxMoney"),
$("money").min().as("minMoney"));
//6.将SQL的执行结果转换成DataStream再打印出来
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
//7.excute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;
}
}
2.4 案例4(SQL消费Kafka)
需求:从Kafka
中消费数据并过滤出状态为success
的数据再写入到Kafka
{
"user_id": "1", "page_id":"1", "status": "success"}
{
"user_id": "1", "page_id":"1", "status": "success"}
{
"user_id": "1", "page_id":"1", "status": "success"}
{
"user_id": "1", "page_id":"1", "status": "success"}
{
"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现:
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
/**
* 从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:30 下午
*/
public class Demo4 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
TableResult inputTable = tEnv.executeSql(
"CREATE TABLE input_kafka (\n" +
" user_id BIGINT,\n" +
" page_id BIGINT,\n" +
" status STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'input_kafka',\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")"
);
TableResult outputTable = tEnv.executeSql(
"CREATE TABLE output_kafka (\n" +
" user_id BIGINT,\n" +
" page_id BIGINT,\n" +
" status STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'output_kafka',\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
" 'format' = 'json',\n" +
" 'sink.partitioner' = 'round-robin'\n" +
")"
);
String sql = "select " +
"user_id," +
"page_id," +
"status " +
"from input_kafka " +
"where status = 'success'";
Table ResultTable = tEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
tEnv.executeSql("insert into output_kafka select * from " + ResultTable);
//7.excute
env.execute();
}
}
03 Flink SQL常用算子
3.1 SELECT
SELECT :用于从 DataSet/DataStream
中选择数据,用于筛选出某些列。
示例:
- SELECT * FROM Table;// 取出表中的所有列
- SELECT name,age FROM Table;// 取出表中 name 和 age两列
与此同时 SELECT
语句中可以使用函数和别名,例如我们上面提到的 WordCount
中:
SELECT word, COUNT(word) FROM table GROUP BY word;
3.2 WHERE
WHERE :用于从数据集/流中过滤数据,与 SELECT
一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。
示例:
- SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
- SELECT * FROM Table WHERE age = 20;
WHERE
是从原数据中进行过滤,那么在WHERE
条件中,Flink SQL
同样支持 =、<、>、<>、>=、<=
,以及 AND
、OR
等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE
可以结合IN、NOT IN
联合使用。举个例子:
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
3.3 DISTINCT
DISTINCT: 用于从数据集/流中去重根据 SELECT
的结果进行去重。
示例:
SELECT DISTINCT name FROM Table;
对于流式查询,计算查询结果所需的 State
可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。
3.4 GROUP BY
GROUP BY :是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。
示例:
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
3.5 UNION 和 UNION ALL
UNION: 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。
示例:
SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
3.6 JOIN
JOIN :用于把来自两个表的数据联合起来形成结果表,Flink
支持的JOIN
类型包括:
- JOIN - INNER JOIN
- LEFT JOIN - LEFT OUTER JOIN
- RIGHT JOIN - RIGHT OUTER JOIN
- FULL JOIN - FULL OUTER JOIN
这里的JOIN
的语义和我们在关系型数据库中使用的 JOIN
语义一致。
示例:JOIN
(将订单表数据和商品表进行关联)
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
LEFT JOIN
与 JOIN
的区别是当右表没有与左边相 JOIN
的数据时候,右边对应的字段补NULL
输出,RIGHT JOIN
相当于LEFT JOIN
左右两个表交互一下位置。FULL JOIN
相当于RIGHT JOIN
和 LEFT JOIN
之后进行UNION ALL
操作,示例:
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
3.7 Group Window
根据窗口数据划分的不同,目前 Apache Flink
有如下 3 种 Bounded Window:
- 滚动窗口(Tumble):窗口数据有固定的大小,窗口数据无叠加;
- 滑动窗口(Hop):窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;
- 会话窗口(Session):窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。
3.7.1 Tumble Window滚动窗口
Tumble
滚动窗口有固定大小,窗口数据不重叠,具体语义如下:
Tumble 滚动窗口对应的语法如下:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
其中:
- [gk] :决定了是否需要按照字段进行聚合;
- TUMBLE_START: 代表窗口开始时间;
- TUMBLE_END :代表窗口结束时间;
- timeCol :是流表中表示时间字段;
- size :表示窗口的大小,如 秒、分钟、小时、天。
举个例子,假如我们要计算每个人每天的订单量,按照user
进行聚合分组:
SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;
3.7.2 Hop Window滑动窗口
Hop
滑动窗口和滚动窗口类似,窗口有固定的size
,与滚动窗口不同的是滑动窗口可以通过slide
参数控制滑动窗口的新建频率。因此当 slide
值小于窗口size
的值的时候多个滑动窗口会重叠,具体语义如下:
Hop 滑动窗口对应语法如下:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
每次字段的意思和 Tumble 窗口类似:
- [gk] :决定了是否需要按照字段进行聚合;
- HOP_START: 表示窗口开始时间;
- HOP_END: 表示窗口结束时间;
- timeCol :表示流表中表示时间字段;
- slide: 表示每次窗口滑动的大小;
- size: 表示整个窗口的大小,如 秒、分钟、小时、天。
举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:
SELECT product, SUM(amount)
FROM Orders
GROUP BY product,HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY)
3.7.3 Session Window会话时间窗口
会话时间窗口没有固定的持续时间,但它们的界限由interval
不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。
Seeeion 会话窗口对应语法如下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
字段解析:
- [gk] :决定了是否需要按照字段进行聚合;
- SESSION_START :表示窗口开始时间;
- SESSION_END :表示窗口结束时间;
- timeCol :表示流表中表示时间字段;
- gap :表示窗口数据非活跃周期的时长。
例如,我们需要计算每个用户访问时间 12 小时内的订单量:
SELECT user, SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, SUM(amount)
FROM Orders
GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user
04 文末
本文主要讲解Flink Table
和SQL
的案例以及Flink SQL
常用算子的总结,谢谢大家的阅读,本文完!