01 引言
在前面的博客,我们已经可以基本把Flink
的环境搭建起来了,
Flink
的环境既然有了,那么本文开始讲解Flink
的入门案例。
02 开发前准备
在写入门案例之前,需要知道一些概念,即:API
和编程模型。
2.1 API
在《02、Flink教程 - Flink入门》讲述过Flink
是由以下组件栈组成的:
- 物理部署层
- RuntimeTime核心层
- API & Libraires
- 扩展库
Flink提供了多个层次的API
供开发者使用,越往上抽象程度越高,使用起来越方便,越往下越底层,使用起来难度越大。
注意:在Flink1.12
时支持流批一体,DataSetAPI
已经不推荐使用了,所以会优先使用DataStream
流式API
,既支持无界数据处理/流处理,也支持有界数据处理/批处理!
参阅文献:
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
- https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
2.2 编程模型
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
Flink
应用程序结构主要包含三部分,Source/Transformation/Sink
,如下图所示:
03 入门案例
现在提一个需求:使用Flink
实现WordCount
。
3.1 项目搭建
首先pom
文件添加依赖(为了方便以后的项目演示,这里的pom
文件依赖了将来要讲解的库):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ylw</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0.0</version>
<description>Flink demo</description>
<developers>
<developer>
<name>Yang Lin Wei</name>
<url>https://yanglinwei.blog.csdn.net/</url>
</developer>
</developers>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink执行计划,这是1.9版本之前的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<version>8.0.20</version>-->
</dependency>
<!-- 高性能异步组件:Vertx-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>3.9.0</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<!-- 参考:https://blog.csdn.net/f641385712/article/details/84109098-->
<!--<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>-->
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
然后配置log4j.properties
文件:
log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{
HH:mm:ss,SSS} %-5p %-60c %x - %m%n
3.2 代码实现
这里我们按照官方的入门案例来讲解:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
开发步骤:
- step1:准备环境-env
- step2:准备数据-source
- step3:处理数据-transformation
- step4:输出结果-sink
- step5:触发执行-execute
创建环境可以使用如下3种方式:
getExecutionEnvironment() //推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
3.2.1 基于DataSet
/**
* 需求:使用Flink完成WordCount-DataSet
*
* @author : YangLinWei
* @createTime: 2022/3/7 9:47 上午
* <p>
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
*/
public class WordCount1 {
/**
* 老版本的批处理API如下,但已经不推荐使用了
*/
public static void main(String[] args) throws Exception {
//1.准备环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.准备数据-source
DataSet<String> lineDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
/*
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
//3.2对集合中的每个单词记为1
/*
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataSet<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//3.3对数据按照单词(key)进行分组
//0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组
UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOnesDS.groupBy(0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加!
DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
//3.5排序
DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
//env.execute();//'execute()', 'count()', 'collect()', or 'print()'.
}
}
运行结果:
3.2.2 基于DataStream
/**
* 需求:使用Flink完成WordCount-DataStream
*
* @author : YangLinWei
* @createTime: 2022/3/7 10:02 上午
* <p>
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class WordCount2 {
/**
* 新版本的流批统一API,既支持流处理也支持批处理
*/
public static void main(String[] args) throws Exception {
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.准备数据-source
DataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
/*
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
//3.2对集合中的每个单词记为1
/*
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//3.3对数据按照单词(key)进行分组
//0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组
//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加!
DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute();//DataStream需要调用execute
}
}
运行结果:
3.2.3 Lambda版
/**
* 需求:使用Flink完成WordCount-DataStream--使用lambda表达式
*
* @author : YangLinWei
* @createTime: 2022/3/7 10:05 上午
* <p>
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class WordCount3 {
public static void main(String[] args) throws Exception {
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.准备数据-source
DataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
/*
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
//lambda表达式的语法:
// (参数)->{方法体/函数体}
//lambda表达式就是一个函数,函数的本质就是对象
DataStream<String> wordsDS = linesDS.flatMap(
(String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING);
//3.2对集合中的每个单词记为1
/*
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
/*DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT));*/
DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(
(String value) -> Tuple2.of(value, 1)
, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
})
);
//3.3对数据按照单词(key)进行分组
//0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组
//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
//KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy((KeySelector<Tuple2<String, Integer>, String>) t -> t.f0);
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加!
DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute();
}
}
输出结果:
3.2.3 在Yarn上运行(待验证)
step1:首先运行hadoop
cd /usr/local/Cellar/hadoop/3.3.1/sbin
./start-all.sh
step2:注意写入HDFS
存在权限问题,进行如下设置:
hadoop fs -chmod -R 777 /
step3:代码如下(注意代码添加了:System.setProperty("HADOOP_USER_NAME", "root")
):
/**
* 需求:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行
*
* @author : YangLinWei
* @createTime: 2022/3/7 10:11 上午
* <p>
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute//批处理不需要调用!流处理需要
*/
public class WordCount4 {
public static void main(String[] args) throws Exception {
//获取参数
ParameterTool params = ParameterTool.fromArgs(args);
String output = null;
if (params.has("output")) {
output = params.get("output");
} else {
output = "hdfs://127.0.0.1:8020/wordcount/output_" + System.currentTimeMillis();
}
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.准备数据-source
DataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
DataStream<Tuple2<String, Integer>> result = linesDS
.flatMap(
(String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING)
.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT))
//.keyBy(0);
.keyBy((KeySelector<Tuple2<String, Integer>, String>) t -> t.f0)
.sum(1);
//4.输出结果-sink
result.print();
//如果执行报hdfs权限相关错误,可以执行 hadoop fs -chmod -R 777 /
System.setProperty("HADOOP_USER_NAME", "root");//设置用户名
//result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1);
result.writeAsText(output).setParallelism(1);
//5.触发执行-execute
env.execute();
}
}
step4:打包
step5:打包成功后jar包:
step5:提交至服务器指定的路径(参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html):
- 注意:本系统是mac系统,所有环境都在本机,所以就不上传了。
step6:Flink命令执行jar:
flink run -Dexecution.runtime-mode=BATCH \
-m yarn-cluster \
-yjm 1024 \
-ytm 1024 \
-c com.ylw.WordCount4 \
项目路径/flink-demo/target/original-flink-demo-1.0.0.jar\
--output hdfs://127.0.0.1:8020/wordcount/output_xx
在Web页面可以观察到提交的程序:
或者在Standalone
模式下使用web
界面提交
05 文末
本文主要讲解了Flink
的入门例子,主要基于DataSet
和DataStream
来讲解,谢谢大家的阅读,本文完!