玩转Flink:在Yarn集群中进行会话、单作业和应用模式的部署
星光下的赶路人star的个人主页
知世故而不世故 是善良的成熟
文章目录
- 1、Flink部署
- 1.1 集群角色
- 1.2 Flink集群搭建
- 1.2.1 集群启动
- 1.2.2 向集群提交作业
- 1.3 部署模式
- 1.3.1 会话模式(Session Mode)
- 1.3.2 单作业模式(Per-Job Mode)
- 1.3.3 应用模式(Application Mode)
- 1.4 Standalone运行模式
- 1.4.1 会话模式部署
- 1.4.2 单作业模式部署
- 1.4.3 应用模式部署
- 1.5 Yarn运行模式(非常重要)
- 1.5.1 相关准备和配置
- 1.5.2 会话模式部署
- 1.5.3 单作业模式部署
- 1.5.4 应用模式部署
- 1.6 K8S运行模式(了解)
- 1.7 历史服务器
1、Flink部署
1.1 集群角色
Flink提交作业和执行任务,需要几个关键组件:
客户端(Client):代码由客户端获取并做转换,之后提交给JobManager
JobManager就是Flink集群里的“管事人”,对作业进行*调度管理,而它获取到要执行的作业后,会进一步处理转换,然后分发给众多的TaskManager。
TaskManager,就是真正的“干活的人”,数据的处理操作都是它们来做的
- 。
注意:Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。
1.2 Flink集群搭建
1.2.1 集群启动
0、集群规划
节点服务器 | hadoop102 | hadoop103 | hadoop104 |
角色 | JobManager、TaskManager | TaskManager | TaskManager |
具体安装部署步骤如下:
1、下载并解压安装包
(1)下载(直接去官网下)安装包flink-1.17.0-bin-scala_2.12.tgz,将该jar包上传到hadoop102节点服务器的/opt/software路径上。
(2)在/opt/software路径上解压安装包到/opt/module路径上
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/ • 1
2、修改集群配置
(1)进入conf路径,修改flink-conf.yaml文件,指定hadoop102节点服务器为JobManager
vim flink-conf.yaml • 1
修改如下内容:
# JobManager节点地址. jobmanager.rpc.address: hadoop102 jobmanager.bind-host: 0.0.0.0 rest.address: hadoop102 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.bind-host: 0.0.0.0 taskmanager.host: hadoop102
2)修改workers文件,指定hadoop102、hadoop103和hadoop104为TaskManager
vim workers • 1
修改如下内容:
hadoop102 hadoop103 hadoop104
3)修改master文件
vim masters • 1
修改内容如下:
hadoop102:8081 • 1
(4)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:
jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。
taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
3、分发安装目录
(1)配置修改完毕后,将Flink安装目录发给另外两个节点服务器。
xsync flink-1.17.0/ • 1
(2)修改hadoop103的 taskmanager.host
修改如下内容:
# TaskManager节点地址.需要配置为当前机器名 taskmanager.host: hadoop103 • 1 • 2
(3)修改hadoop104的 taskmanager.host(和上面差不多只是改为hadoop104而已)
4、启动集群
(1)在hadoop102节点服务器上执行start-cluster.sh启动Flink集群:
bin/start-cluster.sh • 1
(2)查看进程情况:
5、访问WebUI
启动成功后,同样可以访问http://hadoop102:8081对Flink集群和任务进行监控管理。
这样可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。
1.2.2 向集群提交作业
在上一章中,我们已经编写读取socket发送的单词并统计单词的个数程序案例。本节我们将以该程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。
1、环境准备
在hadoop102中执行以下命令启动netcat。
nc -lk hadoop102 7777 • 1
2、程序打包
(1)在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,具体如下:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
(2)插件配置完毕后,可以使用IDEA的Maven工具执行package命令,出现如下提示即表示打包成功。
打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,FlinkDemo-1.0-SNAPSHOT.jar和original-FlinkDemo-1.0-SNAPSHOT.jar,因为集群中已经具备任务运行所需的所有依赖,所以建议使用FlinkDemo-1.0-SNAPSHOT.jar。
3、在WebUI上提交作业
(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如下图所示。
jar包上传完成,如下图所示
(2)点击该jar包,出现任务配置页面,进行相应的配置
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。
(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情情况。
(4)测试
①在socket端口中输入hello