13、Flink教程 - Flink高级API(状态管理)

01 引言

在前面的博客,我们已经对Flink批流一体API的使用有了一定的了解了。

在前面的教程,我们已经学习了Flink的四大基石里面的Time了,如下图,本文讲解下State 状态
 

02 Flink中的有状态计算

Flink中已经对需要进行有状态计算的API做了封装,底层已经维护好了状态!

不需要像SparkStreaming那样还得自己写updateStateByKey,也就是说我们今天学习的State只需要掌握原理,实际开发中一般都是使用Flink底层维护好的状态或第三方维护好的状态(如Flink整合Kafkaoffset维护底层就是使用的State,但是人家已经写好了的)

/**
 * 有状态计算
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 11:31 下午
 */
public class SourceDemo031 {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.处理数据-transformation
        //3.1每一行数据按照空格切分成一个个的单词组成一个集合
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {

                //value就是一行行的数据
                String[] words = value.split(" ");
                for (String word : words) {

                    out.collect(word);//将切割处理的一个个的单词收集起来并返回
                }
            }
        });
        //3.2对集合中的每个单词记为1
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {

                //value就是进来一个个的单词
                return Tuple2.of(value, 1);
            }
        });

        //3.3对数据按照单词(key)进行分组
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.输出结果-sink
        result.print();

        //5.触发执行-execute
        env.execute();
    }
}

执行 netcat,然后在终端输入 hello world,执行程序会输出什么?

  • 答案很明显,(hello, 1)和 (word,1)

那么问题来了,如果再次在终端输入 hello world,程序会输入什么?

  • 答案其实也很明显,(hello, 2)和(world, 2)。

因为Flink 知道之前已经处理过一次 hello world,这就是 state 发挥作用了,这里是被称为 keyed state 存储了之前需要统计的数据,所以Flink 知道helloworld分别出现过一次。

03 有状态和无状态计算

3.1 无状态计算

3.1.1 无状态计算特点

无状态计算的特点:

  • 不需要考虑历史数据
  • 相同的输入得到相同的输出就是无状态计算,如map/flatMap/filter…

3.1.2 无状态计算例子(消费延迟计算)

&nbsp;
上图是一个无状态计算的例子,消费延迟计算:假设现在有一个消息队列,消息队列中有一个生产者持续往消费队列写入消息,多个消费者分别从消息队列中读取消息。

从图上可以看出,生产者已经写入 16 条消息,Offset 停留在 15 ;有 3 个消费者,有的消费快,而有的消费慢。消费快的已经消费了 13 条数据,消费者慢的才消费了 7、8 条数据。

如何实时统计每个消费者落后多少条数据,如图给出了输入输出的示例。可以了解到输入的时间点有一个时间戳,生产者将消息写到了某个时间点的位置,每个消费者同一时间点分别读到了什么位置。刚才也提到了生产者写入了 15 条,消费者分别读取了 10、7、12 条。那么问题来了,怎么将生产者、消费者的进度转换为右侧示意图信息呢?

consumer 0 落后了 5 条,consumer 1落后了 8 条,consumer 2 落后了 3 条,根据 Flink的原理,此处需进行Map 操作。Map 首先把消息读取进来,然后分别相减,即可知道每个 consumer分别落后了几条。Map 一直往下发,则会得出最终结果。

大家会发现,在这种模式的计算中,无论这条输入进来多少次,输出的结果都是一样的,因为单条输入中已经包含了所需的所有信息。消费落后等于生产者减去消费者。生产者的消费在单条数据中可以得到,消费者的数据也可以在单条数据中得到,所以相同输入可以得到相同输出,这就是一个无状态的计算。

3.2 有状态计算

3.2.1 有状态计算特点

特点:

  • 需要考虑历史数据
  • 相同的输入得到不同的输出/不一定得到相同的输出,就是有状态计算,如:sum/reduce

3.2.2 有状态计算例子(访问量统计)

&nbsp;

以访问日志统计量的例子进行说明,比如当前拿到一个 Nginx 访问日志,一条日志表示一个请求,记录该请求从哪里来,访问的哪个地址,需要实时统计每个地址总共被访问了多少次,也即每个 API 被调用了多少次。

可以看到下面简化的输入和输出,输入第一条是在某个时间点请求 GET 了 /api/a;第二条日志记录了某个时间点 Post /api/b ;第三条是在某个时间点 GET了一个 /api/a,总共有 3 个 Nginx日志。

从这3 条 Nginx 日志可以看出,第一条进来输出 /api/a 被访问了一次,第二条进来输出 /api/b被访问了一次,紧接着又进来一条访问api/a,所以 api/a 被访问了 2 次。

不同的是,两条/api/aNginx日志进来的数据是一样的,但输出的时候结果可能不同,第一次输出 count=1 ,第二次输出count=2,说明相同输入可能得到不同输出。

输出的结果取决于当前请求的 API 地址之前累计被访问过多少次。第一条过来累计是 0 次,count = 1,第二条过来API的访问已经有一次了,所以/api/a访问累计次数 count=2。单条数据其实仅包含当前这次访问的信息,而不包含所有的信息。

要得到这个结果,还需要依赖 API 累计访问的量,即状态

这个计算模式是将数据输入算子中,用来进行各种复杂的计算并输出数据。这个过程中算子会去访问之前存储在里面的状态。另外一方面,它还会把现在的数据对状态的影响实时更新,如果输入 200 条数据,最后输出就是 200 条结果。

3.2.3 有状态计算的场景

&nbsp;

有状态计算用到了以下4个场景:

  • 去重:比如上游的系统数据可能会有重复,落到下游系统时希望把重复的数据都去掉。去重需要先了解哪些数据来过,哪些数据还没有来,也就是把所有的主键都记录下来,当一条数据到来后,能够看到在主键当中是否存在。
  • 窗口计算:比如统计每分钟 Nginx 日志API被访问了多少次。窗口是一分钟计算一次,在窗口触发前,如 08:00 ~ 08:01 这个窗口,前59秒的数据来了需要先放入内存,即需要把这个窗口之内的数据先保留下来,等到 8:01 时一分钟后,再将整个窗口内触发的数据输出。未触发的窗口数据也是一种状态。
  • 机器学习/深度学习:如训练的模型以及当前模型的参数也是一种状态,机器学习可能每次都用有一个数据集,需要在数据集上进行学习,对模型进行一个反馈。
  • 访问历史数据:比如与昨天的数据进行对比,需要访问一些历史数据。如果每次从外部去读,对资源的消耗可能比较大,所以也希望把这些历史数据也放入状态中做对比。

04 状态的分类

4.1 Managed State & Raw State

Flink是否接管角度可以分为:

  • ManagedState(托管状态)
  • RawState(原始状态)
类型 Managed State Raw State
状态管理方式 Flink Runtime管理(自动存储,自动恢复、内存管理上的优化) 用户自己管理(需要自己序列化 )
状态数据结构 已知的数据结构(value,list,map…) 字节数组(byte[]
推荐使用场景 大多数情况下均可使用 自定义Operator时可使用

两者的区别如下:

1、 从状态管理方式的方式来说,ManagedStateFlinkRuntime管理,自动存储,自动恢复,在内存管理上有优化;而RawState需要用户自己管理,需要自己序列化,Flink不知道State中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构;
2、 从状态数据结构来说,ManagedState支持已知的数据结构,如ValueListMap等而RawState只支持字节数组,所有状态都要转换为二进制字节数组才可以;
3、 从推荐使用场景来说,ManagedState大多数情况下均可使用,而RawState是当ManagedState不够用时,比如需要自定义Operator时,才会使用RawState

在实际生产中,都只推荐使用ManagedState

4.2 Keyed State & Operator State

Managed State 分为两种:Keyed StateOperator State (Raw State都是Operator State)

类型 Keyed State Operator State
算子 只能用在KeyedStream上的算子中 可以用于所有算子(常用于source,例如FlinkKafakaConsumer
对应State 每个key对应一个State(一个Operator实例处理多个key,访问相应的多个State 一个Operator实例对应一个State
并发改变 并发改变,State随着Key在实例间迁移 并发改变时有多重重新分配方式可选(均匀分配、合并后每个得到全量)
访问方式 通过RuntimeContext访问(Rich Function 实现CheckpointedFunctionListCheckpointed接口
支持结构 支持的数据结构(ValueStateListStateReducingStateAggregatingStateMapState 支持的数据结构(ListState

4.2.1 Keyed State

&nbsp;
Flink Stream模型中,Datastream经过 keyBy的操作可以变为KeyedStream

Keyed State是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,如stream.keyBy(…)

KeyBy之后的State,可以理解为分区过的State,每个并行keyed Operator的每个实例的每个key都有一个Keyed State,即<parallel-operator-instance,key>就是一个唯一的状态,由于每个key属于一个keyed Operator的并行实例,因此我们将其简单的理解为<operator,key>

4.2.2 Operator State

&nbsp;
这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为list stateoperator state

Operator State又称为 non-keyed state,与Key无关的State,每一个 operator state都仅与一个operator的实例绑定。

Operator State 可以用于所有算子,但一般常用于 Source

05 State存储结构

前面说过有状态计算其实就是需要考虑历史数据,而历史数据需要搞个地方存储起来,Flink为了方便不同分类的State的存储和管理,提供了如下的API/数据结构来存储State
&nbsp;

5.1 State API

Keyed State 通过 RuntimeContext 访问,这需要 Operator是一个RichFunction

保存Keyed state的数据结构:

  • ValueState<T>:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值,如求按用户id统计用户交易总额
  • ListState<T>:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值,如统计按用户id统计用户经常登录的Ip
  • ReducingState<T>:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
  • MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素
    需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄
  • Operator State:需要自己实现 CheckpointedFunction或 ListCheckpointed接口。保存Operator state的数据结构:(ListState<T>,BroadcastState<K,V>

举例来说,Flink中的FlinkKafkaConsumer,就使用了operator state,它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

&nbsp;

06 State 案例

6.1 Keyed State案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state

下图就word countsum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state
&nbsp;
需求:使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)

编码步骤:

//-1.定义一个状态用来存放最大值
private transient ValueState<Long> maxValueState;

//-2.创建一个状态描述符对象
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);

//-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);

//-4.使用State
Long historyValue = maxValueState.value();
//判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue) 

//-5.更新状态
maxValueState.update(currentValue);    

示例代码:

/**
 * 使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 12:13 上午
 */
public class KeyedState {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//方便观察

        //2.Source
        DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L)
        );

        //3.Transformation
        //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
        //实现方式1:直接使用maxBy--开发中使用该方式即可
        //min只会求出最小的那个字段,其他的字段不管
        //minBy会求出最小的那个字段和对应的其他的字段
        //max只会求出最大的那个字段,其他的字段不管
        //maxBy会求出最大的那个字段和对应的其他的字段
        SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0)
                .maxBy(1);

        //实现方式2:使用KeyState中的ValueState---学习测试时使用,或者后续项目中/实际开发中遇到复杂的Flink没有实现的逻辑,才用该方式!
        SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0)
                .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {

                    //-1.定义状态用来存储最大值
                    private ValueState<Long> maxValueState = null;

                    @Override
                    public void open(Configuration parameters) throws Exception {

                        //-2.定义状态描述符:描述状态的名称和里面的数据类型
                        ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
                        //-3.根据状态描述符初始化状态
                        maxValueState = getRuntimeContext().getState(descriptor);
                    }

                    @Override
                    public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {

                        //-4.使用State,取出State中的最大值/历史最大值
                        Long historyMaxValue = maxValueState.value();
                        Long currentValue = value.f1;
                        if (historyMaxValue == null || currentValue > historyMaxValue) {

                            //5-更新状态,把当前的作为新的最大值存到状态中
                            maxValueState.update(currentValue);
                            return Tuple3.of(value.f0, currentValue, currentValue);
                        } else {

                            return Tuple3.of(value.f0, currentValue, historyMaxValue);
                        }
                    }
                });
        //4.Sink
        //result.print();
        result2.print();

        //5.execute
        env.execute();
    }
}

运行结果:
&nbsp;

6.2 Operator State案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state

下图对word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用operator state
&nbsp;
需求:使用ListState存储offset模拟Kafkaoffset维护

编码步骤:

//-1.声明一个OperatorState来记录offset
private ListState<Long> offsetState = null;
private Long offset = 0L;

//-2.创建状态描述器
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);

//-3.根据状态描述器获取State
offsetState = context.getOperatorStateStore().getListState(descriptor);

//-4.获取State中的值
Iterator<Long> iterator = offsetState.get().iterator();
if (iterator.hasNext()) {

     //迭代器中有值
    offset = iterator.next();//取出的值就是offset
}
offset += 1L;
ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",当前的offset为:" + offset);
if (offset % 5 == 0) {

     //每隔5条消息,模拟一个异常

//-5.保存State到Checkpoint中
offsetState.clear();//清理内存中存储的offset到Checkpoint中

//-6.将offset存入State中
offsetState.add(offset);

示例代码:

/**
 * OperatorState
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 12:17 上午
 */
public class OperatorState {

    public static void main(String[] args) throws Exception {

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
        env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
        env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));

        //2.Source
        DataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());

        //3.Transformation
        //4.Sink
        sourceData.print();

        //5.execute
        env.execute();
    }

    /**
     * MyKafkaSource就是模拟的FlinkKafkaConsumer并维护offset
     */
    public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {

        //-1.声明一个OperatorState来记录offset
        private ListState<Long> offsetState = null;
        private Long offset = 0L;
        private boolean flag = true;

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {

            //-2.创建状态描述器
            ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
            //-3.根据状态描述器初始化状态
            offsetState = context.getOperatorStateStore().getListState(descriptor);
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {

            //-4.获取并使用State中的值
            Iterator<Long> iterator = offsetState.get().iterator();
            if (iterator.hasNext()) {

                offset = iterator.next();
            }
            while (flag) {

                offset += 1;
                int id = getRuntimeContext().getIndexOfThisSubtask();
                ctx.collect("分区:" + id + "消费到的offset位置为:" + offset);//1 2 3 4 5 6
                //Thread.sleep(1000);
                TimeUnit.SECONDS.sleep(2);
                if (offset % 5 == 0) {

                    System.out.println("程序遇到异常了.....");
                    throw new Exception("程序遇到异常了.....");
                }
            }
        }

        @Override
        public void cancel() {

            flag = false;
        }

        /**
         * 下面的snapshotState方法会按照固定的时间间隔将State信息存储到Checkpoint/磁盘中,也就是在磁盘做快照!
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {

            //-5.保存State到Checkpoint中
            offsetState.clear();//清理内存中存储的offset到Checkpoint中
            //-6.将offset存入State中
            offsetState.add(offset);
        }
    }
}

运行结果:
&nbsp;

07 文末

本文主要讲解了Flink高级API里面的状态管理,谢谢大家的阅读,本文完!