欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

大数据]Flink 学习笔记 - 状态管理

最编程 2024-03-27 07:03:02
...
  1. 在flink中, 算子任务可以分为有状态无状态两种情况

  2. 无状态的算子任务只需要观察每个独立事件, 然后根据输入的数据直接转换成结果; 如map, filter, flatMap不依赖其他数据的算子都属于无状态算子

  3. 有状态算子的任务, 除了当前数据之外, 还需要一些其他的数据来得到计算结果, 这里的其他数据就是所谓的状态

  4. 对于有状态算子, 一般处理流程如下:

    1. 算子接受上游发来的数据
    2. 获取当前的状态
    3. 根据业务逻辑来计算/更新状态
    4. 得到计算结果, 输出发送到下游任务
    5. 在这里插入图片描述
  5. 状态主要可以分为: 托管状态原始状态

    1. 托管状态: 有flink统一管理, 状态的存储访问/故障恢复和重组等一系列问题都由flink实现, 我们只需要调用接口
    2. 托管状态又可以分为两类: 算子状态键控状态 后者是经过keyBy算子后的状态
    3. 原始状态: 是自己定义的, 相当于开辟了一块内存, 有我们自己实现状态管理/序列化和故障恢复

键控状态(Keyed State)

键控状态是按照键来访问和维护的状态

值状态 (Value State)

案例: 检测传感器的水位值, 若连续两个水位差值大于10则报警

输入输出为:

s1 1 1
s1 2 11
s1 3 30
# [传感器id s1], 当前水位值: 30, 与上一条水位值: 11, 相差超过10!!!
s1 4 10
# [传感器id s1], 当前水位值: 10, 与上一条水位值: 30, 相差超过10!!!
s5 5 30
# [传感器id s5], 当前水位值: 30, 与上一条水位值: 0, 相差超过10!!! (初始值为0, 因此输出, 也可以看到flink状态是按键分区的)

列表状态

列表状态存的是列表, 而非元素, 其get方法返回的是一个Iterable对象

案例: 针对每个传感器输出最高的三个水位值

class ListStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("server.passnight.local", 30000)
                .map(new WaterSensorMapper())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    ListState<Integer> top3Vc;

                    @Override
                    public void open(Configuration parameters) {
                        top3Vc = getRuntimeContext().getListState(new ListStateDescriptor<>("top3Vc", Types.INT));
                    }

                    @Override
                    public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                        // 每来一条数据, 更新top3值
                        top3Vc.add(waterSensor.getVc());
                        List<Integer> newTop3 = StreamSupport.stream(top3Vc.get().spliterator(), false)
                                .sorted(Comparator.reverseOrder())
                                .limit(3)
                                .collect(Collectors.toUnmodifiableList());
                        top3Vc.update(newTop3);
                        out.collect(String.format("[传感器id %s], top3水位值为: [%s]", waterSensor.getId(), newTop3));
                    }
                }).print();
        env.execute();
    }
}

输入输出为:

s1 1 1
# [传感器id s1], top3水位值为: [[1]]
s1 2 2
# [传感器id s1], top3水位值为: [[2, 1]]
s1 3 3
# [传感器id s1], top3水位值为: [[3, 2, 1]]
s1 4 4 # 每次只保留top3
# [传感器id s1], top3水位值为: [[4, 3, 2]]
s2 2 2 # 可以看到列表状态是有分组的
# [传感器id s2], top3水位值为: [[2]]

Map状态

类似于java的map

案例: 统计每种水位值出现的次数

class MapStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("server.passnight.local", 30000)
                .map(new WaterSensorMapper())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    MapState<Integer, Integer> frequency;

                    @Override
                    public void open(Configuration parameters) {
                        frequency = getRuntimeContext().getMapState(new MapStateDescriptor<>("frequency", Types.INT, Types.INT));
                    }

                    @Override
                    public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                        // 每来一条数据, 更新top3值
                        if (!frequency.contains(waterSensor.getVc())) {
                            frequency.put(waterSensor.getVc(), 1);
                        } else {
                            frequency.put(waterSensor.getVc(), frequency.get(waterSensor.getVc()) + 1);
                        }
                        out.collect(String.format("[传感器id %s], 水位频率为: [%s]", waterSensor.getId(),
                                StreamSupport.stream(frequency.entries().spliterator(), false)
                                        .map(String::valueOf)
                                        .collect(Collectors.joining(","))));
                    }
                }).print();
        env.execute();
    }
}

输入输出为:

s1 1 1
# [传感器id s1], 水位频率为: [1=1]
s1 2 2
# [传感器id s1], 水位频率为: [1=1,2=1]
s1 1 1
# [传感器id s1], 水位频率为: [1=2,2=1]
s1 1 1
# [传感器id s1], 水位频率为: [1=3,2=1]
s2 1 1 # 可以看到Map状态也按键分区
# [传感器id s2], 水位频率为: [1=1]
s3 1 3
# [传感器id s3], 水位频率为: [3=1]

规约状态

类似于list状态, 但是对于进入的数据都立即规约

案例: 计算每种传感器的水位和

class ReduceStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("server.passnight.local", 30000)
                .map(new WaterSensorMapper())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    ReducingState<Integer> vcSum;

                    @Override
                    public void open(Configuration parameters) {
                        vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("vcSum", Integer::sum, Types.INT));
                    }

                    @Override
                    public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                        vcSum.add(waterSensor.getVc());
                        out.collect(String.format("[传感器id %s], 水位和为: [%d]", waterSensor.getId(), vcSum.get()));
                    }
                }).print();
        env.execute();
    }
}

输入为:

s1 1 1
s1 2 2
s1 3 10
s2 1 1
s2 2 10

输出为, 可以看到已经根据key分组并求和规约:

[传感器id s1], 水位和为: [1]
[传感器id s1], 水位和为: [3]
[传感器id s1], 水位和为: [13]
[传感器id s2], 水位和为: [1]
[传感器id s2], 水位和为: [11]

聚合状态

聚合状态类似于聚合操作, 同reduce一样, 累加器, 输入输出值类型可以不一样

案例: 计算每种传感器的平均水位

class AggregationStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("server.passnight.local", 30000)
                .map(new WaterSensorMapper())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    AggregatingState<Integer, Double> vcAvg;

                    @Override
                    public void open(Configuration parameters) {
                        vcAvg = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAvg",
                                new AggregateFunction<>() {
                                    @Override
                                    public Tuple2<Integer, Integer> createAccumulator() {
                                        return Tuple2.of(0, 0);
                                    }

                                    @Override
                                    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                                        return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                                    }

                                    @Override
                                    public Double getResult(Tuple2<Integer, Integer> accumulator) {
                                        return accumulator.f0 * 1D / accumulator.f1;
                                    }

                                    @Override
                                    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                                        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                                    }
                                },
                                Types.TUPLE(Types.INT, Types.INT)));
                    }

                    @Override
                    public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                        vcAvg.add(waterSensor.getVc());
                        out.collect(String.format("[传感器id %s], 水位平均值: %d", waterSensor.getId(), vcAvg.get()));
                    }
                }).print();
        env.execute();
    }
}

输入为:

s1 1 1
s1 1 3
s1 1 3
s2 1 1
s2 1 4
s1 1 5

输出为, 可以看到已经实现分组求平均值了:

[传感器id s1], 水位平均值: 1.000000
[传感器id s1], 水位平均值: 2.000000
[传感器id s1], 水位平均值: 2.333333
[传感器id s2], 水位平均值: 1.000000
[传感器id s2], 水位平均值: 2.500000
[传感器id s1], 水位平均值: 3.000000

状态生存时间

  1. 一般情况下, 可以调用clear清理状态
  2. 但是有的时候, 不能直接清楚, 这是需要配置一个状态的生存时间(time to live), 到期后清楚状态
  3. 状态的删除并不是开一个线程不断扫描状态是否过期, 而是给状态附加一个属性, 当对状态访问或修改时可以对失效时间更新, 当清除条件被触发时, 就可以判断状态是否失效, 并进行清除
  4. 状态生存时间主要有几个配置:
    1. newBuilder: 设定生存时间
    2. setUpdateType: 指定什么时候更新失效时间, 可以指定OnCreateAndWrite/OnReadAndWrite等, 默认是前者
    3. setStateVisibility: 设置可见性, 因为清除并不是实时的, 所以当状态过期后依旧可能访问到未清除的过期状态, 此时可以配置NeverReturnExpired表示从不返回过期值即过期即清除; 还可以配置ReturnExpireDefNotCleanedUp表示若未清理, 则返回它的值
class ValueStateTtl {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("server.passnight.local", 30000)
                .map(new WaterSensorMapper())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    ValueState<Integer> lastVc;

                    @Override
                    public void open(Configuration parameters) {
                        StateTtlConfig ttlConfig = StateTtlConfig
                                .newBuilder(Time.seconds(5)) // 过期时间为5s
                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建/写入(更新)时更新过期时间
                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
                                .build();
                        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVc", Types.INT);
                        stateDescriptor.enableTimeToLive(ttlConfig);
                        lastVc = getRuntimeContext().getState(stateDescriptor);
                    }

                    @Override
                    public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(String.format("[传感器id %s], 当前水位值: %d, 与上一条水位值: %d", waterSensor.getId(), waterSensor.getVc(), lastVc.value()));
                        lastVc.update(waterSensor.getVc());
                    }
                }).print();
        env.execute();
    }
}

输入输出为:

s1 1 1
[传感器id s1], 当前水位值: 1, 与上一条水位值: null
s1 2 2 # 等待5s, 状态被清除了
[传感器id s1], 当前水位值: 2, 与上一条水位值: null
s1 3 3
[传感器id s1], 当前水位值: 3, 与上一条水位值: 2
s1 4 4 # 等待5s, 状态被清除了
[传感器id s1], 当前水位值: 4, 与上一条水位值: null
s1 5 5
[传感器id s1], 当前水位值: 5, 与上一条水位值: 4
s1 6 6
[传感器id s1], 当前水位值: 6, 与上一条水位值: 5
s1 7 7
[传感器id s1], 当前水位值: 7, 与上一条水位值: 6

算子状态

  1. 算子状态是算子并行实例上定义的状态, 作用范围被限定为当前算子任务
  2. 算子状态和key无关, 只要key数据被分发到同一个并行算子任务, 就会访问同一个Operator State

列表状态&联合列表状态

案例:map算子中计算数据的条数

这里使用List而非UnionList, 因为后者在重分区会将合并后的状态发送到所有分区, 而前者在合并后, 通过轮询分配到新的分区; 后者存在资源浪费的

class OperatorListStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.socketTextStream("server.passnight.local", 30000)
                .map(new MyCountMapFunction())
                .print();
        env.execute();
    }

    // 要使用算子状态, 需要实现`CheckpointedFunction`
    public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {

        private Long count = 0L;
        private ListState<Long> countState;

        @Override
        public Long map(String value) throws Exception {
            return count++;

        }

        /**
         * 保存状态快照, 即将本地变量拷贝到算子状态中
         *
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception 异常
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("保存状态快照");

            // 清空算子状态
            countState.clear();
            // 将本地变量添加到算子状态中
            countState.add(count);
        }

        /**
         * 初始化状态, 从状态中把数据拷贝到本地变量, 每个子任务会调用一次
         *
         * @param context the context for initializing the operator
         * @throws Exception 异常
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("初始化装状态");
            // 使用上下文初始化算子状态
            countState = context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("countState", Types.LONG));
            if (context.isRestored()) {
                for (Long l : countState.get()) {
                    count += l;
                }
            }
        }
    }
}

输出为:

# 程序开始执行时, 会调用"并行度次"初始化状态方法
初始化装状态
初始化装状态
# 在不断输入数据后(任意数据), 可以看到不同分区会访问并累加同一个状态
2> 0
1> 0
2> 1
1> 1
2> 2
1> 2
2> 3
1> 3

广播状态

  1. 列表状态和联合列表状态在不同的子任务中具有独立的拷贝, 而广播状态会将状态广播到所有的子任务, 所有并行的子任务都会访问同一状态
  2. 因为广播状态是全局一致的, 因此并行度改变时, 只需要简单拷贝状态或删除状态即可

案例: 水位超过指定阈值则发送告警, 其中阈值可以修改

class OperationBroadcastExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("server.passnight.local", 30000)
                .map(new WaterSensorMapper());

        DataStreamSource<String> configDs = env.socketTextStream("server.passnight.local", 30001);

        // 广播配置
        BroadcastStream<String> configBs = configDs.broadcast(new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT));

        // 把数据流和广播后的配置流连接起来
        BroadcastConnectedStream<WaterSensor, String> sensorBcs = sensorDs.connect(configBs);

        //  调用广播链接流
        sensorBcs.process(new BroadcastProcessFunction<WaterSensor, String, String>() {
            /**
             * 数据流的处理方法
             * @param waterSensor The stream element.
             * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
             *     querying the current processing/event time and updating the broadcast state. The context
             *     is only valid during the invocation of this method, do not store it.
             * @param out The collector to emit resulting elements to
             * @throws Exception 父类异常
             */
            @Override
            public void processElement(WaterSensor waterSensor, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT));
                Integer threshold = Objects.requireNonNullElse(broadcastState.get("threshold"), 0);
                if (waterSensor.getVc() > threshold) {
                    out.collect(String.format("[%s], 水位大于阈值%d", waterSensor, threshold));
                }
            }

            /**
             * 广播配置流的处理方法
             * @param value The stream element.
             * @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
             *     current processing/event time and updating the broadcast state. The context is only valid
             *     during the invocation of this method, do not store it.
             * @param out The collector to emit resulting elements to
             * @throws Exception 父类异常
             */
            @Override
            public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {
                // 获取广播状态
                BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT));
                // 往广播流中写数据
                broadcastState.put("threshold", Integer.valueOf(value));
            }
        }).print();
        env.execute();
    }
}

输入输出为:

s1 1 1 # 初始为空, Objects.requireNonNullElse(broadcastState.get("threshold"), 0);
`2> [WaterSensor(id=s1, ts=1, vc=1)], 水位大于阈值0`
10 # 30001端口输入10, 将阈值改为10 
s1 1 1 # 1小于新阈值, 不输出
s1 1 15 # 15大于阈值10, 输出
`2> [WaterSensor(id=s1, ts=1, vc=15)], 水位大于阈值10`

状态后端

  1. 在flink中, 对状态的存储/访问/维护状态是由一个可拔插组件提供的, 这个组件就是状态后端, 其主要职责是: 管理本地状态的存储方式和配置
  2. 状态后端主要可以分为两个:
    1. HashMapStateBackend是把状态保存在内存当中, 他会将状态保存在Taskmanager的JVM堆上
    2. RockDB: 是一个内嵌的Key-Value存储数据库, 可以将数据持久化到磁盘中, 配置了EmbeddedRocksDBStateBackends后, flink就会将处理中的数据放入RocksDB中, 而RockDB默认存储在TaskMaanger的本地数据目录中; RocksDB保存的是被序列化的字节数组, 因此读写需要序列化和反序列化, 由于需要访问磁盘和序列化的缘故, 其性能较差 但其始终执行的是异步快照, 还提供增量式保存检查点, 所以性能也不会太差
    3. 状态后端可以使用setStateBackend进行配置或指定提交参数-D/flink-config.yaml中配置state.back.type
    4. 注意: 使用rocksDB需要导入依赖flink-statebackend-rocksdb

推荐阅读