【flink状态管理(四)】MemoryStateBackend的实现-1.基于MemoryStateBackend创建KeyedStateBackend
最编程
2024-02-15 17:30:54
...
1.1. 状态初始化
AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。
protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(
TypeSerializer<K> keySerializer,
String operatorIdentifierText,
PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry,
MetricGroup metricGroup) throws Exception {
if (keySerializer == null) {
return null;
}
String logDescription = "keyed state backend for " + operatorIdentifierText;
//1.
TaskInfo taskInfo = environment.getTaskInfo();
final KeyGroupRange keyGroupRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
taskInfo.getMaxNumberOfParallelSubtasks(),
taskInfo.getNumberOfParallelSubtasks(),
taskInfo.getIndexOfThisSubtask());
// 确保恢复状态过程中构建的数据流被关闭
CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);
// 创建BackendRestorerProcedure
BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle>
backendRestorer =
new BackendRestorerProcedure<>(
(stateHandles) -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
taskInfo.getMaxNumberOfParallelSubtasks(),
keyGroupRange,
environment.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
metricGroup,
stateHandles,
cancelStreamRegistryForRestore),
backendCloseableRegistry,
logDescription);
try {
return backendRestorer.createAndRestore(
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
} finally {
if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryFor
Restore)) {
IOUtils.closeQuietly(cancelStreamRegistryForRestore);
}
}
}
- 获取当前Task的TaskInfo,并基于TaskInfo的参数创建KeyGroupRange,表示当前Task实例中存储的Key分组区间。
- 创建CloseableRegistry并注册到backendCloseableRegistry中,用于确保在任务取消的情况下关闭在恢复状态过程中构造的数据流。
- 创建BackendRestorerProcedure,提供了stateBackend.createKeyedStateBackend()方法,也包含恢复历史状态数据的方法。
- 创建KeyedStateBackend,同时对状态数据进行恢复。prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子历史状态,通过prioritizedOperatorSubtaskStates获取当前算子的PrioritizedManagedKeyedState,并基于这些状态数据恢复算子的状态。
1.2. 创建状态
接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
// 获取TaskStateManager实例
TaskStateManager taskStateManager = env.getTaskStateManager();
// 创建HeapPriorityQueueSetFactory实例
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackend
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
- 从environment参数中获取TaskStateManager实例
- 创建HeapPriorityQueueSetFactory实例,用于生成HeapPriorityQueueSet优先级队列,存储TimerHeapInternalTimer等数据。
- 调用HeapKeyedStateBackendBuilder.build()方法创建HeapKeyedStateBackend。
上一篇: 让CSS行高和高度一致的设置方法
推荐阅读
-
【flink状态管理(四)】MemoryStateBackend的实现-1.基于MemoryStateBackend创建KeyedStateBackend
-
玩转Java底层:JMX详解 - jconsole与自定义MBean监控工具的实际应用与区别" 在日常JVM调优中,我们熟知的jconsole工具通过JMX包装的bean以图形化形式展示管理数据,而像jstat和jmap这类内建监控工具则由JVM直接支持。本文将以jconsole为例,深入讲解其实质——基于JMX的MBean功能,包括可视化界面上的bean属性查看和操作调用。 MBeans在jconsole中的体现是那些可观察的组件属性和方法,如上图所示,通过名为"Verbose"的属性能看到其值为false,同时还能直接操作该bean的方法,例如"closeJerryMBean"。 尽管jconsole给我们提供了直观的可视化界面,但请注意,这里的MBean并非固定不变,开发者可根据JMX提供的接口将自己的自定义bean展示到jconsole。以下步骤展示了如何创建并注册一个名为"StudyJavaMBean"的自定义MBean: 1. 首先定义接口`StudyJavaMBean`,接口需遵循MBean规范,即后缀为"MBean"且包含getter方法代表属性,如`getApplicationName`,和无返回值的setter方法代表操作,如`closeJerryMBean`。 ```java public interface StudyJavaMBean { String getApplicationName(); void closeJerryMBean(); } ``` 2. 编写接口的实现类`StudyJavaMBeanImpl`,实现接口中的方法: ```java public class StudyJavaMBeanImpl implements StudyJavaMBean { @Override public String getApplicationName() { return "每天学Java"; } @Override public void closeJerryMBean() { System.out.println("关闭Jerry应用"); } } ``` 3. 在代码中注册自定义MBean,涉及的关键步骤包括: - 获取平台MBeanServer - 定义ObjectName,指定唯一的MBean标识符 - 注册MBean到服务器 - 启动RMI连接器服务,以便jconsole能够访问 ```java public void registerMBean() throws Exception { // ... 具体实现省略 ... } ``` 实际运行注册后的MBean,您将在jconsole中发现并查看自定义bean的属性和调用相关方法。然而,这种方式相较于传统的属性/日志查看和HTTP接口,实用性相对有限,可能存在潜在的安全风险。但不可否认的是,JMX及其MBean机制对于获取操作系统信息、内存状态等关键性能指标仍然具有重要价值。例如: 1. **获取操作系统信息**:通过JMX MBean,可以直接获取到诸如CPU使用率、操作系统版本等系统级信息,这对于资源管理和优化工作具有显著帮助。