大数据]Flink 学习笔记 - 状态管理
-
在flink中, 算子任务可以分为有状态和无状态两种情况
-
无状态的算子任务只需要观察每个独立事件, 然后根据输入的数据直接转换成结果; 如
map
,filter
,flatMap
等不依赖其他数据的算子都属于无状态算子 -
而有状态算子的任务, 除了当前数据之外, 还需要一些其他的数据来得到计算结果, 这里的其他数据就是所谓的状态
-
对于有状态算子, 一般处理流程如下:
- 算子接受上游发来的数据
- 获取当前的状态
- 根据业务逻辑来计算/更新状态
- 得到计算结果, 输出发送到下游任务
-
状态主要可以分为: 托管状态和原始状态
- 托管状态: 有flink统一管理, 状态的存储访问/故障恢复和重组等一系列问题都由flink实现, 我们只需要调用接口
- 托管状态又可以分为两类: 算子状态和键控状态 后者是经过keyBy算子后的状态
- 原始状态: 是自己定义的, 相当于开辟了一块内存, 有我们自己实现状态管理/序列化和故障恢复
键控状态(Keyed State)
键控状态是按照键来访问和维护的状态
值状态 (Value State)
案例: 检测传感器的水位值, 若连续两个水位差值大于10则报警
输入输出为:
s1 1 1
s1 2 11
s1 3 30
# [传感器id s1], 当前水位值: 30, 与上一条水位值: 11, 相差超过10!!!
s1 4 10
# [传感器id s1], 当前水位值: 10, 与上一条水位值: 30, 相差超过10!!!
s5 5 30
# [传感器id s5], 当前水位值: 30, 与上一条水位值: 0, 相差超过10!!! (初始值为0, 因此输出, 也可以看到flink状态是按键分区的)
列表状态
列表状态存的是列表, 而非元素, 其get
方法返回的是一个Iterable
对象
案例: 针对每个传感器输出最高的三个水位值
class ListStateExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("server.passnight.local", 30000)
.map(new WaterSensorMapper())
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
ListState<Integer> top3Vc;
@Override
public void open(Configuration parameters) {
top3Vc = getRuntimeContext().getListState(new ListStateDescriptor<>("top3Vc", Types.INT));
}
@Override
public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 每来一条数据, 更新top3值
top3Vc.add(waterSensor.getVc());
List<Integer> newTop3 = StreamSupport.stream(top3Vc.get().spliterator(), false)
.sorted(Comparator.reverseOrder())
.limit(3)
.collect(Collectors.toUnmodifiableList());
top3Vc.update(newTop3);
out.collect(String.format("[传感器id %s], top3水位值为: [%s]", waterSensor.getId(), newTop3));
}
}).print();
env.execute();
}
}
输入输出为:
s1 1 1
# [传感器id s1], top3水位值为: [[1]]
s1 2 2
# [传感器id s1], top3水位值为: [[2, 1]]
s1 3 3
# [传感器id s1], top3水位值为: [[3, 2, 1]]
s1 4 4 # 每次只保留top3
# [传感器id s1], top3水位值为: [[4, 3, 2]]
s2 2 2 # 可以看到列表状态是有分组的
# [传感器id s2], top3水位值为: [[2]]
Map状态
类似于java的map
案例: 统计每种水位值出现的次数
class MapStateExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("server.passnight.local", 30000)
.map(new WaterSensorMapper())
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
MapState<Integer, Integer> frequency;
@Override
public void open(Configuration parameters) {
frequency = getRuntimeContext().getMapState(new MapStateDescriptor<>("frequency", Types.INT, Types.INT));
}
@Override
public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 每来一条数据, 更新top3值
if (!frequency.contains(waterSensor.getVc())) {
frequency.put(waterSensor.getVc(), 1);
} else {
frequency.put(waterSensor.getVc(), frequency.get(waterSensor.getVc()) + 1);
}
out.collect(String.format("[传感器id %s], 水位频率为: [%s]", waterSensor.getId(),
StreamSupport.stream(frequency.entries().spliterator(), false)
.map(String::valueOf)
.collect(Collectors.joining(","))));
}
}).print();
env.execute();
}
}
输入输出为:
s1 1 1
# [传感器id s1], 水位频率为: [1=1]
s1 2 2
# [传感器id s1], 水位频率为: [1=1,2=1]
s1 1 1
# [传感器id s1], 水位频率为: [1=2,2=1]
s1 1 1
# [传感器id s1], 水位频率为: [1=3,2=1]
s2 1 1 # 可以看到Map状态也按键分区
# [传感器id s2], 水位频率为: [1=1]
s3 1 3
# [传感器id s3], 水位频率为: [3=1]
规约状态
类似于list状态, 但是对于进入的数据都立即规约
案例: 计算每种传感器的水位和
class ReduceStateExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("server.passnight.local", 30000)
.map(new WaterSensorMapper())
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
ReducingState<Integer> vcSum;
@Override
public void open(Configuration parameters) {
vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("vcSum", Integer::sum, Types.INT));
}
@Override
public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
vcSum.add(waterSensor.getVc());
out.collect(String.format("[传感器id %s], 水位和为: [%d]", waterSensor.getId(), vcSum.get()));
}
}).print();
env.execute();
}
}
输入为:
s1 1 1
s1 2 2
s1 3 10
s2 1 1
s2 2 10
输出为, 可以看到已经根据key分组并求和规约:
[传感器id s1], 水位和为: [1]
[传感器id s1], 水位和为: [3]
[传感器id s1], 水位和为: [13]
[传感器id s2], 水位和为: [1]
[传感器id s2], 水位和为: [11]
聚合状态
聚合状态类似于聚合操作, 同reduce一样, 累加器, 输入输出值类型可以不一样
案例: 计算每种传感器的平均水位
class AggregationStateExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("server.passnight.local", 30000)
.map(new WaterSensorMapper())
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
AggregatingState<Integer, Double> vcAvg;
@Override
public void open(Configuration parameters) {
vcAvg = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAvg",
new AggregateFunction<>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f0 * 1D / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
},
Types.TUPLE(Types.INT, Types.INT)));
}
@Override
public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
vcAvg.add(waterSensor.getVc());
out.collect(String.format("[传感器id %s], 水位平均值: %d", waterSensor.getId(), vcAvg.get()));
}
}).print();
env.execute();
}
}
输入为:
s1 1 1
s1 1 3
s1 1 3
s2 1 1
s2 1 4
s1 1 5
输出为, 可以看到已经实现分组求平均值了:
[传感器id s1], 水位平均值: 1.000000
[传感器id s1], 水位平均值: 2.000000
[传感器id s1], 水位平均值: 2.333333
[传感器id s2], 水位平均值: 1.000000
[传感器id s2], 水位平均值: 2.500000
[传感器id s1], 水位平均值: 3.000000
状态生存时间
- 一般情况下, 可以调用
clear
清理状态 - 但是有的时候, 不能直接清楚, 这是需要配置一个状态的生存时间(time to live), 到期后清楚状态
- 状态的删除并不是开一个线程不断扫描状态是否过期, 而是给状态附加一个属性, 当对状态访问或修改时可以对失效时间更新, 当清除条件被触发时, 就可以判断状态是否失效, 并进行清除
- 状态生存时间主要有几个配置:
-
newBuilder
: 设定生存时间 -
setUpdateType
: 指定什么时候更新失效时间, 可以指定OnCreateAndWrite
/OnReadAndWrite
等, 默认是前者 -
setStateVisibility
: 设置可见性, 因为清除并不是实时的, 所以当状态过期后依旧可能访问到未清除的过期状态, 此时可以配置NeverReturnExpired
表示从不返回过期值即过期即清除; 还可以配置ReturnExpireDefNotCleanedUp
表示若未清理, 则返回它的值
-
class ValueStateTtl {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("server.passnight.local", 30000)
.map(new WaterSensorMapper())
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000)))
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
ValueState<Integer> lastVc;
@Override
public void open(Configuration parameters) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(5)) // 过期时间为5s
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建/写入(更新)时更新过期时间
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
.build();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVc", Types.INT);
stateDescriptor.enableTimeToLive(ttlConfig);
lastVc = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(WaterSensor waterSensor, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(String.format("[传感器id %s], 当前水位值: %d, 与上一条水位值: %d", waterSensor.getId(), waterSensor.getVc(), lastVc.value()));
lastVc.update(waterSensor.getVc());
}
}).print();
env.execute();
}
}
输入输出为:
s1 1 1
[传感器id s1], 当前水位值: 1, 与上一条水位值: null
s1 2 2 # 等待5s, 状态被清除了
[传感器id s1], 当前水位值: 2, 与上一条水位值: null
s1 3 3
[传感器id s1], 当前水位值: 3, 与上一条水位值: 2
s1 4 4 # 等待5s, 状态被清除了
[传感器id s1], 当前水位值: 4, 与上一条水位值: null
s1 5 5
[传感器id s1], 当前水位值: 5, 与上一条水位值: 4
s1 6 6
[传感器id s1], 当前水位值: 6, 与上一条水位值: 5
s1 7 7
[传感器id s1], 当前水位值: 7, 与上一条水位值: 6
算子状态
- 算子状态是算子并行实例上定义的状态, 作用范围被限定为当前算子任务
- 算子状态和key无关, 只要key数据被分发到同一个并行算子任务, 就会访问同一个Operator State
列表状态&联合列表状态
案例: 在map
算子中计算数据的条数
这里使用List
而非UnionList
, 因为后者在重分区会将合并后的状态发送到所有分区, 而前者在合并后, 通过轮询分配到新的分区; 后者存在资源浪费的
class OperatorListStateExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.socketTextStream("server.passnight.local", 30000)
.map(new MyCountMapFunction())
.print();
env.execute();
}
// 要使用算子状态, 需要实现`CheckpointedFunction`
public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
private Long count = 0L;
private ListState<Long> countState;
@Override
public Long map(String value) throws Exception {
return count++;
}
/**
* 保存状态快照, 即将本地变量拷贝到算子状态中
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception 异常
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("保存状态快照");
// 清空算子状态
countState.clear();
// 将本地变量添加到算子状态中
countState.add(count);
}
/**
* 初始化状态, 从状态中把数据拷贝到本地变量, 每个子任务会调用一次
*
* @param context the context for initializing the operator
* @throws Exception 异常
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("初始化装状态");
// 使用上下文初始化算子状态
countState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("countState", Types.LONG));
if (context.isRestored()) {
for (Long l : countState.get()) {
count += l;
}
}
}
}
}
输出为:
# 程序开始执行时, 会调用"并行度次"初始化状态方法
初始化装状态
初始化装状态
# 在不断输入数据后(任意数据), 可以看到不同分区会访问并累加同一个状态
2> 0
1> 0
2> 1
1> 1
2> 2
1> 2
2> 3
1> 3
广播状态
- 列表状态和联合列表状态在不同的子任务中具有独立的拷贝, 而广播状态会将状态广播到所有的子任务, 所有并行的子任务都会访问同一状态
- 因为广播状态是全局一致的, 因此并行度改变时, 只需要简单拷贝状态或删除状态即可
案例: 水位超过指定阈值则发送告警, 其中阈值可以修改
class OperationBroadcastExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("server.passnight.local", 30000)
.map(new WaterSensorMapper());
DataStreamSource<String> configDs = env.socketTextStream("server.passnight.local", 30001);
// 广播配置
BroadcastStream<String> configBs = configDs.broadcast(new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT));
// 把数据流和广播后的配置流连接起来
BroadcastConnectedStream<WaterSensor, String> sensorBcs = sensorDs.connect(configBs);
// 调用广播链接流
sensorBcs.process(new BroadcastProcessFunction<WaterSensor, String, String>() {
/**
* 数据流的处理方法
* @param waterSensor The stream element.
* @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
* querying the current processing/event time and updating the broadcast state. The context
* is only valid during the invocation of this method, do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception 父类异常
*/
@Override
public void processElement(WaterSensor waterSensor, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT));
Integer threshold = Objects.requireNonNullElse(broadcastState.get("threshold"), 0);
if (waterSensor.getVc() > threshold) {
out.collect(String.format("[%s], 水位大于阈值%d", waterSensor, threshold));
}
}
/**
* 广播配置流的处理方法
* @param value The stream element.
* @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
* current processing/event time and updating the broadcast state. The context is only valid
* during the invocation of this method, do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception 父类异常
*/
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {
// 获取广播状态
BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT));
// 往广播流中写数据
broadcastState.put("threshold", Integer.valueOf(value));
}
}).print();
env.execute();
}
}
输入输出为:
s1 1 1 # 初始为空, Objects.requireNonNullElse(broadcastState.get("threshold"), 0);
`2> [WaterSensor(id=s1, ts=1, vc=1)], 水位大于阈值0`
10 # 30001端口输入10, 将阈值改为10
s1 1 1 # 1小于新阈值, 不输出
s1 1 15 # 15大于阈值10, 输出
`2> [WaterSensor(id=s1, ts=1, vc=15)], 水位大于阈值10`
状态后端
- 在flink中, 对状态的存储/访问/维护状态是由一个可拔插组件提供的, 这个组件就是状态后端, 其主要职责是: 管理本地状态的存储方式和配置
- 状态后端主要可以分为两个:
-
HashMapStateBackend
是把状态保存在内存当中, 他会将状态保存在Taskmanager的JVM堆上 -
RockDB
: 是一个内嵌的Key-Value存储数据库, 可以将数据持久化到磁盘中, 配置了EmbeddedRocksDBStateBackends
后, flink就会将处理中的数据放入RocksDB中, 而RockDB默认存储在TaskMaanger的本地数据目录中; RocksDB保存的是被序列化的字节数组, 因此读写需要序列化和反序列化, 由于需要访问磁盘和序列化的缘故, 其性能较差 但其始终执行的是异步快照, 还提供增量式保存检查点, 所以性能也不会太差 - 状态后端可以使用
setStateBackend
进行配置或指定提交参数-D
/flink-config.yaml
中配置state.back.type
-
注意: 使用rocksDB需要导入依赖
flink-statebackend-rocksdb
-
上一篇: [算法] 计数排序-1.简介
下一篇: 计算机网络 (03) - 传输层
推荐阅读
-
数据结构(邓俊辉)学习笔记】数据结构(邓俊辉)学习笔记向量 02 - 动态空间管理-3.
-
大数据]Flink 学习笔记 - 状态管理
-
实时音频和视频技术的发展与应用-1.1 双重音频和视频 从架构上看,双人音视频系统相对简单明了。红点代表房间信令服务,房间信令服务的主要功能是管理房间信息,实现容量协商和上下行链路的质量调节,例如当下行信道发生拥塞时,上行线路的码率和分辨率会降低。 在传输信道层面,我们的策略是优先直连,在跨区域、跨运营商的情况下,我们会选择单中转或双中转信道,在策略上尽量保持直连和中转信道同时存在,当其中一个信道的质量不好时,系统会自动切断到另一个信道的流量。 1.2 多人音视频 多人视频通话的产品形态是整个房间不超过 50 人,大盘平均房间规模约为 4.x 人,房间内部最多满足一个大视频和三个小视频(四屏)。根据这一条件,我们在架构中采用了典型的 SFU 小房间设计。 上图中的红点代表房间信令服务,主要用于房间管理和状态信息同步。房间管理主要包括用户列表的管理,例如哪些用户打开了视频/音频,我看了谁,谁看了我,这些都是基于房间管理的信息,然后房间信令服务会将这些信息同步到媒体传输服务进行数据分发。 房间服务的另一个作用是房间级容量协商和质量控制,例如,房间里的每个人一开始都支持 H.265 编码,当某个时刻进来一个只支持 H.264 编码的用户时,房间里所有的上游主播就必须把 H.265 切成 H.264。还有一种情况是,房间里有一定比例的人下行链路信道质量较差,这会导致上行链路房间质量下降。 在传输层面,我们采用的是单层分布式媒体传输网络,大家都选择中转方式,不区分双人和多人,采用 Full-Mesh 传输机制将所有数据推送过去,比如一个节点上的人并不都看另外两个人的视频,但还是会将视频推送给他们。