09、Flink教程 - Flink批流一体API(Connectors示例)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Sink使用有了一定的了解了。

在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

  • 预定义 data sources :支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。
  • 预定义 data sinks :支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

而连接器(Connectors)可以和多种多样的第三方系统进行交互,本文来讲解下。

02 Connectors

Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/

2.1 Flink目前支持的Connectors

Flink目前支持以下系统:

系统 使用地方
Apache Kafka source/sink
Apache Cassandra sink
Amazon Kinesis Streams source/sink
Elasticsearch sink
FileSystem(包括 Hadoop ) - 仅支持流 sink
FileSystem(包括 Hadoop ) - 流批统一 sink
RabbitMQ source/sink
Apache NiFi source/sink
Twitter Streaming API source
Google PubSub source/sink
JDBC sink

在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:

系统 使用地方
Apache ActiveMQ source/sink
Apache Flume sink
Redis sink
Akka sink
Netty source

2.2 JDBC案例

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

代码如下:

/**
 * Connectors -JDBC
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:59 下午
 */
public class ConnectorsDemo {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma111", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO t_student (id, name, age) VALUES (null, ?, ?)",
                        (ps, s) -> {

                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/big_data")
                                .withUsername("root")
                                .withPassword("123456")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {

        private Integer id;
        private String name;
        private Integer age;
    }
}

运行结果:
 

2.3 Kafa案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
--------------Flink 里已经提供了一些绑定的Connector,例如kafka sourcesinkEs sink等。读写 kafkaesrabbitMQ时可以直接使用相应connectorapi即可,虽然该部分是 Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job 时候需要注意, job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

2.3.1 Kafa相关命令

●查看当前服务器中的所有topic:

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181

●创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka

●查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181

●删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka

●通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka

●通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning 

●修改分区

 /export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181

2.3.2 Kafka Consumer代码

需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount

需要设置如下参数:

  • 1.订阅的主题
  • 2.反序列化规则
  • 3.消费者属性-集群地址
  • 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
  • 5.消费者属性-offset重置规则,如earliest/latest…
  • 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
  • 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/**
 * KafkaConsumer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:17 下午
 */
public class KafkaConsumer {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.Source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset", "latest");
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");

        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费

        //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {

                String[] words = value.split(" ");
                for (String word : words) {

                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.Sink
        result.print();

        //5.execute
        env.execute();
    }
}

2.3.3 Kafka Producer代码

需求:将Flink集合中的数据通过自定义Sink保存到Kafka

/**
 * KafkaProducer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:19 下午
 */
public class KafkaProducer {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));

        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //可以直接调用Student的toString,也可以转为JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {

            @Override
            public String map(Student value) throws Exception {

                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });

        //4.Sink
        jsonDS.print();
        //根据参数创建KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        jsonDS.addSink(kafkaSink);

        //5.execute
        env.execute();

        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {

        private Integer id;
        private String name;
        private Integer age;
    }
}

2.4 Redis案例

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

2.4.1 相关API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink提供了专门操作redisRedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

RedisSink 核心类是RedisMapper是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:

  • getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
  • String getKeyFromData(T data):设置value 中的键值对key的值
  • String getValueFromData(T data):设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系:

Data Type Redis Command [Sink]
HASH HSET
LIST RPUSH, LPUSH
SET SADD
PUBSUB PUBLISH
STRING SET
HYPER_LOG_LOG PFADD
SORTED_SET ZADD
SORTED_SET ZADD
SORTED_SET ZREM

2.4.2 示例代码

需求:将Flink集合中的数据通过自定义Sink保存到Redis

代码如下:

/**
 * Connector-Redis
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:25 下午
 */
public class ConnectorsDemoRedis {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {

                String[] words = value.split(" ");
                for (String word : words) {

                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)

        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //连接集群版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //连接哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();

        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));

        //5.execute
        env.execute();
    }

    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {

        @Override
        public RedisCommandDescription getCommandDescription() {

            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }

        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {

            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {

            return data.f1.toString();
        }
    }
}

03 文末

本文主要讲解与Flink批流一体API相关的连接器Connectors(连接器),谢谢大家的阅读,本文完!