01 引言
在前面的博客,我们已经对Flink
的程序模型里的Sink
使用有了一定的了解了。
在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。
- 预定义 data sources :支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。
- 预定义 data sinks :支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
而连接器(Connectors
)可以和多种多样的第三方系统进行交互,本文来讲解下。
02 Connectors
Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/
2.1 Flink目前支持的Connectors
Flink目前支持以下系统:
系统 | 使用地方 |
---|---|
Apache Kafka | source/sink |
Apache Cassandra | sink |
Amazon Kinesis Streams | source/sink |
Elasticsearch | sink |
FileSystem(包括 Hadoop ) - 仅支持流 | sink |
FileSystem(包括 Hadoop ) - 流批统一 | sink |
RabbitMQ | source/sink |
Apache NiFi | source/sink |
Twitter Streaming API | source |
Google PubSub | source/sink |
JDBC | sink |
在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink
工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。
Flink
还有些一些额外的连接器通过 Apache Bahir 发布, 包括:
系统 | 使用地方 |
---|---|
Apache ActiveMQ | source/sink |
Apache Flume | sink |
Redis | sink |
Akka | sink |
Netty | source |
2.2 JDBC案例
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html
代码如下:
/**
* Connectors -JDBC
*
* @author : YangLinWei
* @createTime: 2022/3/7 4:59 下午
*/
public class ConnectorsDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
env.fromElements(new Student(null, "tonyma111", 18))
//3.Transformation
//4.Sink
.addSink(JdbcSink.sink(
"INSERT INTO t_student (id, name, age) VALUES (null, ?, ?)",
(ps, s) -> {
ps.setString(1, s.getName());
ps.setInt(2, s.getAge());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/big_data")
.withUsername("root")
.withPassword("123456")
.withDriverName("com.mysql.jdbc.Driver")
.build()));
//5.execute
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
运行结果:
2.3 Kafa案例
参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
--------------Flink
里已经提供了一些绑定的Connector
,例如kafka source
和sink
,Es sink
等。读写kafka
、es
、rabbitMQ
时可以直接使用相应connector
的api
即可,虽然该部分是Flink
项目源代码里的一部分,但是真正意义上不算作Flink
引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job
时候需要注意,job
代码jar
包中一定要将相应的connetor
相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
2.3.1 Kafa相关命令
●查看当前服务器中的所有topic
:
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
●创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
●查看某个Topic的详情
/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181
●删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka
●通过shell命令发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
●通过shell消费消息
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning
●修改分区
/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181
2.3.2 Kafka Consumer代码
需求:使用flink-connector-kafka_2.12
中的FlinkKafkaConsumer
消费Kafka
中的数据做WordCount
需要设置如下参数:
- 1.订阅的主题
- 2.反序列化规则
- 3.消费者属性-集群地址
- 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
- 5.消费者属性-offset重置规则,如earliest/latest…
- 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
- 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/**
* KafkaConsumer
*
* @author : YangLinWei
* @createTime: 2022/3/7 5:17 下午
*/
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
props.setProperty("group.id", "flink");
props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
//kafkaSource就是KafkaConsumer
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
//kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//3.Transformation
//3.1切割并记为1
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//3.2分组
KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
//3.3聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
//4.Sink
result.print();
//5.execute
env.execute();
}
}
2.3.3 Kafka Producer代码
需求:将Flink
集合中的数据通过自定义Sink
保存到Kafka
/**
* KafkaProducer
*
* @author : YangLinWei
* @createTime: 2022/3/7 5:19 下午
*/
public class KafkaProducer {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
//3.Transformation
//注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
//可以直接调用Student的toString,也可以转为JSON
SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
@Override
public String map(Student value) throws Exception {
//String str = value.toString();
String jsonStr = JSON.toJSONString(value);
return jsonStr;
}
});
//4.Sink
jsonDS.print();
//根据参数创建KafkaProducer/KafkaSink
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
jsonDS.addSink(kafkaSink);
//5.execute
env.execute();
// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
2.4 Redis案例
参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
2.4.1 相关API
通过flink
操作redis
其实我们可以通过传统的redis
连接池Jpoools
进行redis
的相关操作,但是flink
提供了专门操作redis
的RedisSink
,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink
如何使用。
RedisSink
核心类是RedisMapper
是一个接口,使用时我们要编写自己的redis
操作类实现这个接口中的三个方法,如下所示:
- getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
- String getKeyFromData(T data):设置value 中的键值对key的值
- String getValueFromData(T data):设置value 中的键值对value的值
使用RedisCommand
设置数据结构类型时和redis
结构对应关系:
Data Type | Redis Command [Sink] |
---|---|
HASH | HSET |
LIST | RPUSH, LPUSH |
SET | SADD |
PUBSUB | PUBLISH |
STRING | SET |
HYPER_LOG_LOG | PFADD |
SORTED_SET | ZADD |
SORTED_SET | ZADD |
SORTED_SET | ZREM |
2.4.2 示例代码
需求:将Flink
集合中的数据通过自定义Sink
保存到Redis
代码如下:
/**
* Connector-Redis
*
* @author : YangLinWei
* @createTime: 2022/3/7 5:25 下午
*/
public class ConnectorsDemoRedis {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
DataStream<String> linesDS = env.socketTextStream("node1", 9999);
//3.Transformation
//3.1切割并记为1
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//3.2分组
KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
//3.3聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
//4.Sink
result.print();
// * 最后将结果保存到Redis
// * 注意:存储到Redis的数据结构:使用hash也就是map
// * key value
// * WordCount (单词,数量)
//-1.创建RedisSink之前需要创建RedisConfig
//连接单机版Redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
//连接集群版Redis
//HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
//FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
//连接哨兵版Redis
//Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
//FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
//-3.创建并使用RedisSink
result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
//5.execute
env.execute();
}
/**
* -2.定义一个Mapper用来指定存储到Redis中的数据结构
*/
public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
}
03 文末
本文主要讲解与Flink
批流一体API
相关的连接器Connectors
(连接器),谢谢大家的阅读,本文完!