大数据]Flink CDC 实时同步 mysql 数据
目录
一、前言
二、Flink CDC介绍
2.1 什么是Flink CDC
2.2 Flink CDC 特点
2.3 Flink CDC 核心工作原理
2.4 Flink CDC 使用场景
三、常用的数据同步方案对比
3.1 数据同步概述
3.1.1 数据同步来源
3.2 常用的数据同步方案汇总
3.3 为什么推荐Flink CDC
3.4 Flink CDC 适用范围
3.5 Flink CDC不同版本对比
3.5.1 Flink CDC 1.x
3.5.2 Flink CDC 2.x
3.5.3 Flink CDC 3.x
四、Java使用Flink CDC同步mysql数据
4.1 环境准备
4.1.1 组件版本说明
4.1.2 数据库准备
4.1.3 导入相关的依赖
4.2 使用Flink CDC动态监听mysql数据变化
4.2.1 自定义反序列化器
4.2.2 自定义Sink输出
4.2.3 启动任务类
4.2.4 效果测试
4.3 与springboot整合实现过程
4.3.1 补充依赖
4.3.2 启动类改造
4.3.3 效果测试
五、写在文末
一、前言
在微服务系统架构中,经常会涉及到跨系统的数据同步,比如需要从A系统的mysql数据库同步到B系统的oracle数据库,再比如说需要将mysql中的数据同步到es中,更有一些场景下,涉及到多个系统的异构数据源,需要加工处理后将数据推送到kafka中。在这些场景下,需求各异,需要满足的场景也不尽相同。很难有某一种开源工具可以满足所有的场景,接下来本文将介绍使用flinkcdc的方式同步mysql的数据。
二、Flink CDC介绍
2.1 什么是Flink CDC
Flink CDC(Change Data Capture)是一个用于实时捕获数据库变更事件的工具,它是基于 Apache Flink 构建的。Flink CDC 可以从关系型数据库中实时捕获表的数据变更事件,并将这些事件转化为流式数据,以便进行实时处理和分析。这对于实时数据仓库、实时数据分析以及数据同步等场景非常有用。
Flink-CDC 开源地址: Apache/Flink-CDC
Flink-CDC 中文文档:Apache Flink CDC | Apache Flink CDC
Flink CDC connector 可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。Flink CDC connector 可以直接在Flink中以非约束模式(流)使用,而不需要使用类似 kafka 之类的中间件中转数据。
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。
2.2 Flink CDC 特点
Flink CDC 具备如下特点
-
实时性:Flink CDC 支持实时捕获数据库中的数据变更事件,这意味着它可以立即响应数据库中的任何更改。
-
支持多种数据库:Flink CDC 支持多种关系型数据库,如 MySQL、PostgreSQL、Oracle 等。
-
易于集成:Flink CDC 可以轻松集成到现有的 Flink 应用程序中,利用 Flink 强大的流处理能力来处理捕获的数据。
-
高可用性:Flink CDC 设计为高可用的,可以在分布式环境中运行,并且支持故障恢复和容错机制。
-
灵活的数据处理:捕获的数据可以被转换、过滤、聚合等,支持复杂的数据处理逻辑。
2.3 Flink CDC 核心工作原理
Flink CDC 的工作原理主要包括以下几个步骤:
-
连接数据库:Flink CDC 通过 JDBC 连接到目标数据库,并监听数据库的变更事件。
-
捕获变更事件:通过监听数据库的日志(如 MySQL 的 Binlog、PostgreSQL 的 WAL 日志等),捕获数据的插入、更新、删除等变更事件。
-
转换为流数据:将捕获的变更事件转换为 Flink 可处理的流数据格式。
-
处理数据:将转换后的流数据送入 Flink 的流处理管道,进行进一步的数据处理,如清洗、聚合、分析等。
-
输出结果:处理后的数据可以被输出到其他系统,如写入另一个数据库、发送到消息队列、写入文件系统等。
2.4 Flink CDC 使用场景
Flink CDC(Change Data Capture)是一种用于实时捕获数据库变更事件的技术,它能够从关系型数据库中捕获表的数据变更,并将这些变更事件转化为流式数据,以便进行实时处理。基于这个特性,在下面这些场景下,可以考虑使用Flink CDC处理:
-
实时数据仓库
-
实时更新数据仓库:Flink CDC 可以实时捕获数据库中的变更事件,并将这些事件实时地加载到数据仓库中,从而实现实时的数据更新和查询。
-
实时分析:通过捕获和处理实时数据,企业可以进行实时的业务分析,比如实时监控销售数据、实时统计用户行为等。
-
-
实时数据同步
-
数据库间同步:Flink CDC 可以将一个数据库中的变更事件实时同步到另一个数据库,实现数据的实时复制。这对于需要跨数据中心或多云环境的数据同步场景非常有用。
-
异构系统间同步:可以将传统的关系型数据库的数据实时同步到 NoSQL 数据库、搜索引擎或其他存储系统中,实现数据的实时一致性和可用性。
-
-
实时数据处理
-
实时ETL:Flink CDC 可以作为 ETL(Extract, Transform, Load)流程的一部分,用于实时提取、转换和加载数据。例如,可以从数据库中实时提取数据,进行清洗和聚合处理后,再加载到数据仓库或其他系统中。
-
实时统计分析:通过对捕获的变更事件进行实时处理,可以实现各种实时统计分析,如实时流量统计、实时交易统计等。
-
-
实时告警和监控
-
实时监控:Flink CDC 可以实时捕获数据库中的变更事件,并对这些事件进行实时分析和处理,用于实时监控关键指标的变化。
-
实时告警:当检测到异常数据变更时,可以实时触发告警通知,帮助企业及时响应和处理问题。
-
-
实时推荐系统
-
实时用户行为追踪:通过对用户行为数据的实时捕获和处理,可以实现实时的个性化推荐。
-
实时更新推荐模型:Flink CDC 可以实时捕获用户的最新行为数据,并用于实时更新推荐模型,从而提高推荐的准确性和时效性。
-
-
实时交易处理
-
金融交易:在金融行业中,Flink CDC 可以用于实时捕获和处理交易数据,实现高速的交易处理和结算。
-
欺诈检测:通过对实时交易数据的分析,可以快速检测出潜在的欺诈行为,并及时采取措施。
-
-
物联网(IoT)应用
-
实时数据采集:在 IoT 场景中,Flink CDC 可以用于实时采集设备数据,并进行实时处理和分析。
-
实时决策支持:通过对 IoT 数据的实时分析,可以支持实时的决策制定,如实时调整设备的工作状态、优化资源配置等。
-
-
实时日志处理
-
实时日志分析:Flink CDC 可以用于实时捕获数据库的日志数据,并进行实时处理和分析,用于性能监控、错误诊断等。
-
审计与合规:通过实时捕获数据库的操作记录,可以用于审计和合规性的检查。
-
三、常用的数据同步方案对比
3.1 数据同步概述
当系统发展到一定阶段,尤其是系统的规模越来越大,业务体量也不断扩大的时候,一个系统可能会用到多种数据存储中间件,而不再是单纯的mysql,pgsql等,甚至一个系统中一个或多个微服务无法再满足业务的需求,而需要单独做数据存储的服务,在类似的场景下,很难避免新的服务需要从原有的微服务中抽取数据,或定期做数据的同步处理,诸如此类的场景还有很多,如何处理数据同步的需求呢?这就是下面要讨论的问题。
3.1.1 数据同步来源
系统数据同步的需求通常源于多种业务和技术上的考量。以下是一些主要的原因,解释为什么需要进行系统数据同步:
-
高可用性和灾难恢复
-
数据冗余:通过将数据同步到多个位置,可以增加数据的冗余度,减少单点故障的风险。
-
灾难恢复:在发生硬件故障、自然灾害或其他不可预见事件时,可以迅速切换到备用数据副本,确保业务连续性
-
-
负载均衡和性能优化
-
读写分离:将读操作和写操作分开处理,通过将读操作分散到多个数据库实例上来降低单一数据库的负载。
-
性能优化:通过将热点数据同步到性能更高的存储设备或系统,提升数据访问速度和系统整体性能。
-
-
地理分布
-
全球分布:在全球范围内分布数据,以减少网络延迟,提高用户体验。
-
多地备份:在不同地理位置建立数据备份中心,以应对局部故障或自然灾害的影响。
-
-
数据整合
-
数据仓库:将来自不同源的数据整合到一个统一的数据仓库中,以便进行集中管理和分析。
-
实时分析:通过实时同步数据,可以快速获取最新的业务数据,进行实时分析和决策支持。
-
-
业务扩展
-
系统迁移:当公司需要更换数据库系统或升级现有系统时,需要将数据从旧系统同步到新系统。
-
新系统上线:在部署新的应用程序或服务时,需要将现有数据同步到新的系统中。
-
-
数据共享
-
多部门协作:不同部门或团队需要访问相同的数据集,以促进信息共享和协同工作。
-
合作伙伴集成:与外部合作伙伴或第三方系统共享数据,实现数据互通和业务合作。
-
-
数据一致性
-
分布式事务:在分布式系统中,需要保证数据的一致性,通过数据同步来协调不同节点之间的状态。
-
分布式缓存:在使用分布式缓存(如 Redis Cluster)时,需要将数据同步到不同的缓存节点,保持数据的一致性。
-
-
数据迁移
-
数据转移:在进行数据迁移时,需要将数据从一个系统或数据库转移到另一个系统或数据库,以适应业务发展和技术进步的需要。
-
3.2 常用的数据同步方案汇总
下图列举了几种主流的用于解决数据同步场景的方案
关于图中几种技术,做如下简单的介绍,便于做技术选型作为对比
-
Debezium
-
Debezium是国外⽤户常⽤的CDC组件,单机对于分布式来说,在数据读取能力的拓展上,没有分布式的更具有优势,在大数据众多的分布式框架中(Hive、Hudi等)Flink CDC 的架构能够很好地接入这些框架。
-
-
DataX
-
DataX无法支持增量同步。如果一张Mysql表每天增量的数据是不同天的数据,并且没有办法确定它的产生时间,那么如何将数据同步到数仓是一个值得考虑的问题。DataX支持全表同步,也支持sql查询的方式导入导出,全量同步一定是不可取的,sql查询的方式没有可以确定增量数据的字段的话也不是一个好的增量数据同步方案。
-
-
Canal
-
Canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。Canal主要支持了MySQL的Binlog解析,将增量数据写入中间件中(例如kafka,Rocket MQ等),但是无法同步历史数据,因为无法获取到binlog的变更。
-
-
Sqoop
-
Sqoop主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递。Sqoop将导入或导出命令翻译成mapreduce程序来实现,这样的弊端就是Sqoop只能做批量导入,遵循事务的一致性,Mapreduce任务成功则同步成功,失败则全部同步失败。
-
-
Kettle
-
Kettle是一款开源的数据集成工具,主要用于数据抽取、转换和加载(ETL)。它提供了图形化的界面,使得用户可以通过拖拽组件的方式来设计和执行数据集成任务。Kettle 被广泛应用于数据仓库构建、数据迁移、数据清洗等多种场景。
-
虽然 Kettle 在处理中小型数据集时表现良好,但在处理大规模数据集时可能会遇到性能瓶颈,尤其是在没有进行优化的情况下。
-
Kettle 在运行时可能会消耗较多的内存和 CPU 资源,特别是当处理复杂的转换任务时。
-
虽然 Kettle 的基本操作相对简单,但对于高级功能和复杂任务的设计,用户仍需投入一定的时间和精力来学习和掌握。
-
-
Flink CDC
-
Flink CDC 基本都弥补了以上框架的不足,将数据库的全量和增量数据一体化地同步到消息队列和数据仓库中;也可以用于实时数据集成,将数据库数据实时入湖入仓;无需像其他的CDC工具一样需要在服务器上进行部署,减少了维护成本,链路更少;完美套接Flink程序,CDC获取到的数据流直接对接Flink进行数据加工处理,一套代码即可完成对数据的抽取转换和写出,既可以使用flink的DataStream API完成编码,也可以使用较为上层的FlinkSQL API进行操作。
-
3.3 为什么推荐Flink CDC
在java生态中,以一个使用较多的场景为例说明,如下,是一个业务系统与大数据系统进行交互时的数据处理流
在上面这个处理流程下,业务系统将数据正常写入mysql数据库,数据库需要开启binlog,然后借助canal监听binlog把日志写入到kafka中,而Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等,所以整个过程分为下面几个步骤:
-
mysql开启binlog
-
canal同步binlog数据写入到kafka
-
flink(或其他组件)读取kakfa中的binlog数据进行相关的业务处理。
不难发现,上面的处理流程整体的链路较长,需要用到的组件也比较多,一旦中间某个组件运行过程中发生问题,下游的其他业务均会中断,系统的脆弱性风险就增加了,而使用了 Flink CDC之后,则会变成下面这样了,也就是说,只需要数据库开启binlog即可,而后Flink CDC就可以自动进行数据同步了,整个链路就大大缩减了
3.4 Flink CDC 适用范围
Flink CDC(Change Data Capture)连接器是 Apache Flink 社区为 Flink 提供的一种用于捕获数据库变更事件的工具。它允许用户从关系型数据库中实时捕获表的数据变更,并将这些变更事件转化为流式数据,以便进行实时处理,比如你需要将mysql的数据同步到另一个mysql数据库,就需要使用mysql连接器,如果需要同步mongodb的数据,则需要使用mongodb的连接器。截止到Flink CDC 2.2 为止,支持的连接器:
支Flink CDC 持的Flink版本,在实际使用的时候需要根据版本的对照进行选择:
3.5 Flink CDC不同版本对比
Flink CDC技术发展到今天,历经了多个版本,所以底层在实现数据同步的时候技术也发生了变化,在实际开发使用中需要搞清楚这一点,为了加深对此处的理解,下面列举了几个版本的实现技术对比
3.5.1 Flink CDC 1.x
在 Flink cdc 1.x 版本中,底层选用 debezium 作为采集工具,Debezium 为保证数据一致性,通过对读取的数据库或者表进行加锁,加锁是在全量的时候加锁。
下图是开发者社区的一张全局锁和表锁的过程图
FlinkCDC全量同步时会获取全局读锁,或者表锁。所谓加锁,目的是为了确认Mysql binlog 的起始位置和Mysql 表的Schema,获取到锁后,Mysql所有的ddl,dml操作都会处于wait read lock阶段,如果锁获取时间超时,程序还会抛出异常;而增量同步时,因为是监控binlog的方式,所以对mysql没有影响。
什么是表锁,表锁会自动加锁。查询操作(SELECT),会自动给涉及的所有表加读锁,更新操作(UPDATE、DELETE、INSERT)。如果启动一个CDC任务,而另一个CDC程序也处于初始化阶段,获取不到全局锁,那么那么此程序就会去获取表级锁,表及锁锁的时间会更长,一般是全局读锁的几十倍时长。
Flink CDC 1.x 可以不加锁,能够满足大部分场景,但牺牲了一定的数据准确性。Flink CDC 1.x 默认加全局锁,虽然能保证数据一致性,但存在上述 hang 住数据的风险。由此可以看来FlinkCDC 1.x 存在着一些不足:
-
由于其锁机制,全量同步阶段之有一个任务在进行同步,不支持并发同步,数据传输会比较慢。
-
锁表时会阻止其他事务提交。
-
不支持断点续传,如果在同步过程中,出现mysql连接超时,或者flink程序快照中断,那么我们无法从断开点开始续传,因为目前暂不支持checkpoint。
3.5.2 Flink CDC 2.x
Flink 2.x 时代就是说既然大家都觉得这个锁没有必要了,那就把Debezium的锁踢开,踢开之后我们就引用了Netfix 的DBlog paper 的思想;
DBlog paper的思想说白了也是分而治之的思想,既然我对全局表加锁会有比较大的影响,那么我把数据做切分,每一段数据做大数据一致,那是不是就可以保证数据的整体一致呢?
举例来说:假如mysql表里有100万条数据,那么我把这些数据切分成1万个chunk(chunk 本意是树干,在这里就是一段数据集合的意思);每个chunk 里面有100条数据,先记录这个chunk里面的最低位和最高位;比如是101和200;读完之后把这个数据存入一个缓冲区,我们叫做buffer;如果后期有个binlog的chunk 过来,比如是105,150和160的数据发生了变化,那么我们就会针对相应的值改动,并不会改动全部的值,改动效率就比较高了,这就是无锁的大致思想了;
水平扩展说白了就是把原来的这个单线程换成多线程的处理嘛,由Flink的source 去实现的;做个checkpoint 也是对最新的那些buffer和chunk 做个checkpoint ,实现了容错;
Flink 2.x不仅引入了增量快照读取机制,还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍:
-
增量快照读取:Flink 2.x引入了增量快照读取机制,这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中,Flink首先根据表的主键将其划分为多个块(chunk),然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。
-
精确一次性处理:Flink 2.x引入了Exactly-Once语义,确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器,可以利用Flink的checkpoint机制来确保精确一次性处理。
-
动态加表:Flink 2.x支持动态加表,通过使用savepoint来复用之前作业的状态,解决了动态加表的问题。
-
无主键表的处理:Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中,Flink可以通过一些额外的字段来识别数据记录的唯一性,从而实现准确的数据读取和处理。
3.5.3 Flink CDC 3.x
虽然 Flink CDC 有很多技术优势,社区用户增长很快,但随着 Flink CDC 项目用户基数的日益增长,以及应用场景的不断扩大,很多问题随着用户在社区的反馈也不断冒出,因此Flink CDC技术团队也适时推出Flink CDC 3.x。
Flink CDC 3.0 的整体架构自顶而下分为 4 层:
-
Flink CDC API:面向终端用户的 API 层,用户使用 YAML 格式配置数据同步流水线,使用 Flink CDC CLI 提交任务
-
Flink CDC Connect:对接外部系统的连接器层,通过对 Flink 与现有 Flink CDC source 进行封装实现对外部系统同步数据的读取和写入
-
Flink CDC Composer:同步任务的构建层,将用户的同步任务翻译为 Flink DataStream 作业
-
Flink CDC Runtime:运行时层,根据数据同步场景高度定制 Flink 算子,实现 schema 变更、路由、变换等高级功能
四、Java使用Flink CDC同步mysql数据
4.1 环境准备
根据flink cdc的版本不同,在java或springboot集成的时候选择的jdk版本也不尽相同,需要注意这一点,否则会在运行过程中出现奇怪的问题。
4.1.1 组件版本说明
本文演示中使用的相关组件版本如下:
-
jdk17;
-
springboot版本,2.5.5;
-
mysql,8.0.23;
-
Flink CDC 相关核心依赖版本,1.13.0;
4.1.2 数据库准备
案例要演示的需求场景为:使用Flink CDC 监听某个表的数据,然后同步到另一张表中,因此需要提前准备两张表,源表和目标表,为了简单起见,两个表除了表名不一样,其他的均相同,建表sql如下:
CREATE TABLE `tb_role` (
`id` varchar(32) NOT NULL COMMENT '主键',
`role_code` varchar(32) NOT NULL COMMENT '版本号',
`role_name` varchar(32) DEFAULT NULL COMMENT '角色名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='tb角色表';
4.1.3 导入相关的依赖
主要是flink cdc相关的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.35</version>
</dependency>
补充说明,单独使用java运行时,还需要添加下面两个日志组件依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
4.2 使用Flink CDC动态监听mysql数据变化
需求场景:通过Flink CDC ,监听tb_role表数据变化,写入tb_role_copy
4.2.1 自定义反序列化器
反序列化器的目的是为了解析flink cdc监听到mysql表数据变化的日志,以json的形式进行解析,方便对日志中的关键参数进行处理
package com.congge.flink.blog;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerSchema implements DebeziumDeserializationSchema<String> {
/**
* 封装的数据格式
* {
* "database":"",
* "tableName":"",
* "before":{"id":"","tm_name":""....},
* "after":{"id":"","tm_name":""....},
* "type":"c u d"
* //"ts":156456135615
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建JSON对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.将字段写入JSON对象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
4.2.2 自定义Sink输出
Sink即为Flink CDC的输出连接器,即监听到源表数据变化并经过处理后最终写到哪里,以mysql为例,我们在监听到tb_role表数据变化后,同步到tb_role_copy中去
package com.congge.flink.blog;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MyJdbcSink extends RichSinkFunction<String> {
// 提前声明连接和预编译语句
private Connection connection = null;
private PreparedStatement insertStmt = null;
private PreparedStatement updateStmt = null;
private PreparedStatement preparedStatement = null;
@Override
public void open(Configuration parameters) throws Exception {
if (connection == null) {
Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://IP:3306/db", "root", "root");
connection.setAutoCommit(false);//关闭自动提交
}
// connection = DriverManager.getConnection("jdbc:mysql://IP:3306/db", "root", "root");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(String value, Context context) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
String type = jsonObject.getString("type");
String tableName = "tb_role_copy";
String database = jsonObject.getString("database");
if(type.equals("insert")){
JSONObject after = (JSONObject)jsonObject.get("after");
Integer id = after.getInteger("id");
String roleCode = after.getString("role_code");
String roleName = after.getString("role_name");
String sql = String.format("insert into %s.%s values (?,?,?)", database, tableName);
insertStmt = connection.prepareStatement(sql);
insertStmt.setInt(1, Integer.valueOf(id));
insertStmt.setString(2, roleCode);
insertStmt.setString(3, roleName);
insertStmt.execute();
connection.commit();
} else if(type.equals("update")){
JSONObject after = jsonObject.getJSONObject("after");
Integer id = after.getInteger("id");
String roleCode = after.getString("role_code");
String roleName = after.getString("role_name");
String sql = String.format("update %s.%s set role_code = ?, role_name = ? where id = ?", database, tableName);
updateStmt = connection.prepareStatement(sql);
updateStmt.setString(1, roleCode);
updateStmt.setString(2, roleName);
updateStmt.setInt(3, id);
updateStmt.execute();
connection.commit();
} else if(type.equals("delete")){
JSONObject after = jsonObject.getJSONObject("before");
Integer id = after.getInteger("id");
String sql = String.format("delete from %s.%s where id = ?", database, tableName);
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, id);
preparedStatement.execute();
connection.commit();
}
}
@Override
public void close() throws Exception {
if(insertStmt != null){
insertStmt.close();
}
if(updateStmt != null){
updateStmt.close();
}
if(preparedStatement != null){
preparedStatement.close();
}
if(connection != null){
connection.close();
}
}
}
4.2.3 启动任务类
本例先以main程序运行,在实际进行线上部署使用时,可以打成jar包或整合springboot进行启动即可
package com.congge.flink.blog;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC_CustomerSchema {
public static void main(String[] args) throws Exception {
//1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// env.enableCheckpointing(3000);
//2、通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("IP")
.port(3306)
.username("root")
.password("3306")
.databaseList("db")
.tableList("db.tb_role")//若不添加该数据,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table
.deserializer(new CustomerSchema())
//.deserializer(new StringDebeziumDeserializationSchema())
//.startupOptions(StartupOptions.initial())
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3、打印数据
streamSource.print();
streamSource.addSink(new MyJdbcSink());
//4、启动任务
env.execute("FlinkCDC_CustomerSchema");
}
}
4.2.4 效果测试
运行上面的代码,通过控制台可以看到任务已经运行起来了,监听并等待数据源数据变更
测试之前,确保两张表数据是一致的
此时为tb_role表增加一条数据,很快控制台可以监听并输出相关的日志
而后,tb_role_copy表同步新增了一条数据
4.3 与springboot整合实现过程
4.3.1 补充依赖
Flink cdc的依赖保持不变,需要单独引入logback的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
同时,在工程的resources目录下添加logback-spring.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 定义日志的根级别和输出方式 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} - %msg%n</pattern>
</encoder>
</appender>
<!-- 日志文件保留策略 -->
<property name="FILE_LOG_POLICY" value="10 MB" />
<!-- 日志文件最大历史数 -->
<property name="MAX_HISTORY" value="30" />
<logger name="org.apache.flink.connector.mysql.cdc" level="error" />
<!-- 应用程序的根日志级别 -->
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
4.3.2 启动类改造
可以直接基于启动类改造,也可以新增一个类,实现ApplicationRunner接口,重写里面的run方法
-
不难发现,run方法里面的代码逻辑即是从上述main方法运行任务里面拷贝过来的;
package com.congge;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.congge.flink.blog.CustomerSchema;
import com.congge.flink.blog.MyJdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@SpringBootApplication
@MapperScan(basePackages = {"com.congge.dao"})
@ServletComponentScan("com.congge.filter")
public class SeekOrderApp implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(SeekOrderApp.class,args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
//1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//env.enableCheckpointing(3000);
//2、通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("192.168.1.196")
.port(13306)
.username("root")
.password("Ghtms@123")
.databaseList("erp_cloud")
.tableList("erp_cloud.tb_role")//若不添加该数据,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table
.deserializer(new CustomerSchema())
//.startupOptions(StartupOptions.initial())
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3、打印数据
//streamSource.print();
streamSource.addSink(new MyJdbcSink());
//4、启动任务
env.execute("FlinkCDC_CustomerSchema");
}
}
4.3.3 效果测试
启动工程之后,相当于是通过flink cdc启动了一个用于监听数据变更的后台进程
然后我们再在数据库tb_role表增加一条数据,控制台可以看到输出了相关的日志
此时再检查数据表,可以发现tb_role_copy表新增了一条一样的数据
五、写在文末
本文通过较大的篇幅详细介绍了Flink CDC相关的技术,最后通过一个实际案例演示了使用Flink CDC同步mysql表数据的示例,希望对看到的同学有用,本篇到此结束感谢观看。
推荐阅读
-
大数据]Flink CDC 实时同步 mysql 数据
-
【2022新手指南】Java编程进阶之路 - 六、技术架构篇 ### MySQL索引底层解析与优化实战 - 你会讲解MySQL索引的数据结构吗?性能调优技巧知多少? - Redis深度揭秘:你知道多少?从基础到哨兵、主从复制全梳理 - Redis持久化及哨兵模式详解,还有集群搭建和Leader选举黑箱打开 - Zookeeper是个啥?特性和应用场景大公开 - ZooKeeper集群搭建攻略及 Leader选举、读写一致性、共享锁实现细节 - 探究ZooKeeper中的Leader选举机制及其在分布式环境中的作用 - Zab协议深入剖析:原理、功能与在Zookeeper中的核心地位 - RabbitMQ全方位解读:工作模式、消费限流、可靠投递与配置策略 - 设计者视角:RabbitMQ过期时间、死信队列与延时队列实践指南 - RocketMQ特性和应用场景揭示:理解其精髓与差异化优势 - Kafka详细介绍:特性及广泛应用于实时数据处理的场景解析 - ElasticSearch实力揭秘:特性概述与作为搜索引擎的广泛应用 - MongoDB认知升级:非关系型数据库的优势阐述,安装与使用实战教学 - BIO/NIO/AIO网络模型对比:掌握它们的区别与在网络编程中的实际应用 - Netty带你飞:理解其超快速度背后的秘密,包括线程模型分析 - 网络通信黑科技:Netty编解码原理与常用编解码器的应用,Protostuff实战演示 - 解密Netty粘包与拆包现象,怎样有效应对这一常见问题 - 自定义Netty心跳检测机制,轻松调整检测间隔时间的艺术 - Dubbo轻骑兵介绍:核心特性概览,服务降级实战与其实现益处 - Dubbo三大神器解读:本地存根与本地伪装的实战运用与优势呈现 ----------------------- 七、结语与回顾
-
SSM三大框架基础面试题-一、Spring篇 什么是Spring框架? Spring是一种轻量级框架,提高开发人员的开发效率以及系统的可维护性。 我们一般说的Spring框架就是Spring Framework,它是很多模块的集合,使用这些模块可以很方便地协助我们进行开发。这些模块是核心容器、数据访问/集成、Web、AOP(面向切面编程)、工具、消息和测试模块。比如Core Container中的Core组件是Spring所有组件的核心,Beans组件和Context组件是实现IOC和DI的基础,AOP组件用来实现面向切面编程。 Spring的6个特征: 核心技术:依赖注入(DI),AOP,事件(Events),资源,i18n,验证,数据绑定,类型转换,SpEL。 测试:模拟对象,TestContext框架,Spring MVC测试,WebTestClient。 数据访问:事务,DAO支持,JDBC,ORM,编组XML。 Web支持:Spring MVC和Spring WebFlux Web框架。 集成:远程处理,JMS,JCA,JMX,电子邮件,任务,调度,缓存。 语言:Kotlin,Groovy,动态语言。 列举一些重要的Spring模块? Spring Core:核心,可以说Spring其他所有的功能都依赖于该类库。主要提供IOC和DI功能。 Spring Aspects:该模块为与AspectJ的集成提供支持。 Spring AOP:提供面向切面的编程实现。 Spring JDBC:Java数据库连接。 Spring JMS:Java消息服务。 Spring ORM:用于支持Hibernate等ORM工具。 Spring Web:为创建Web应用程序提供支持。 Spring Test:提供了对JUnit和TestNG测试的支持。 谈谈自己对于Spring IOC和AOP的理解 IOC(Inversion Of Controll,控制反转)是一种设计思想: 在程序中手动创建对象的控制权,交由给Spring框架来管理。IOC在其他语言中也有应用,并非Spring特有。IOC容器实际上就是一个Map(key, value),Map中存放的是各种对象。 将对象之间的相互依赖关系交给IOC容器来管理,并由IOC容器完成对象的注入。这样可以很大程度上简化应用的开发,把应用从复杂的依赖关系中解放出来。IOC容器就像是一个工厂一样,当我们需要创建一个对象的时候,只需要配置好配置文件/注解即可,完全不用考虑对象是如何被创建出来的。在实际项目中一个Service类可能由几百甚至上千个类作为它的底层,假如我们需要实例化这个Service,可能要每次都搞清楚这个Service所有底层类的构造函数,这可能会把人逼疯。如果利用IOC的话,你只需要配置好,然后在需要的地方引用就行了,大大增加了项目的可维护性且降低了开发难度。 Spring中的bean的作用域有哪些? 1.singleton:该bean实例为单例 2.prototype:每次请求都会创建一个新的bean实例(多例)。 3.request:每一次HTTP请求都会产生一个新的bean,该bean仅在当前HTTP request内有效。 4.session:每一次HTTP请求都会产生一个新的bean,该bean仅在当前HTTP session内有效。 5.global-session:全局session作用域,仅仅在基于Portlet的Web应用中才有意义,Spring5中已经没有了。Portlet是能够生成语义代码(例如HTML)片段的小型Java Web插件。它们基于Portlet容器,可以像Servlet一样处理HTTP请求。但是与Servlet不同,每个Portlet都有不同的会话。 Spring中的单例bean的线程安全问题了解吗? 概念用于理解:大部分时候我们并没有在系统中使用多线程,所以很少有人会关注这个问题。单例bean存在线程问题,主要是因为当多个线程操作同一个对象的时候,对这个对象的非静态成员变量的写操作会存在线程安全问题。 有两种常见的解决方案(用于回答的点): 1.在bean对象中尽量避免定义可变的成员变量(不太现实)。 2.在类中定义一个ThreadLocal成员变量,将需要的可变成员变量保存在ThreadLocal(线程本地化对象)中(推荐的一种方式)。 ThreadLocal解决多线程变量共享问题(参考博客):https://segmentfault.com/a/1190000009236777 Spring中Bean的生命周期: 1.Bean容器找到配置文件中Spring Bean的定义。 2.Bean容器利用Java Reflection API创建一个Bean的实例。 3.如果涉及到一些属性值,利用set方法设置一些属性值。 4.如果Bean实现了BeanNameAware接口,调用setBeanName方法,传入Bean的名字。 5.如果Bean实现了BeanClassLoaderAware接口,调用setBeanClassLoader方法,传入ClassLoader对象的实例。 6.如果Bean实现了BeanFactoryAware接口,调用setBeanClassFacotory方法,传入ClassLoader对象的实例。 7.与上面的类似,如果实现了其他*Aware接口,就调用相应的方法。 8.如果有和加载这个Bean的Spring容器相关的BeanPostProcessor对象,执postProcessBeforeInitialization方法。 9.如果Bean实现了InitializingBean接口,执行afeterPropertiesSet方法。 10.如果Bean在配置文件中的定义包含init-method属性,执行指定的方法。 11.如果有和加载这个Bean的Spring容器相关的BeanPostProcess对象,执行postProcessAfterInitialization方法。 12.当要销毁Bean的时候,如果Bean实现了DisposableBean接口,执行destroy方法。 13.当要销毁Bean的时候,如果Bean在配置文件中的定义包含destroy-method属性,执行指定的方法。 Spring框架中用到了哪些设计模式? 1.工厂设计模式:Spring使用工厂模式通过BeanFactory和ApplicationContext创建bean对象。 2.代理设计模式:Spring AOP功能的实现。 3.单例设计模式:Spring中的bean默认都是单例的。 4.模板方法模式:Spring中的jdbcTemplate、hibernateTemplate等以Template结尾的对数据库操作的类,它们就使用到了模板模式。 5.包装器设计模式:我们的项目需要连接多个数据库,而且不同的客户在每次访问中根据需要会去访问不同的数据库。这种模式让我们可以根据客户的需求能够动态切换不同的数据源。 6.观察者模式:Spring事件驱动模型就是观察者模式很经典的一个应用。 7.适配器模式:Spring AOP的增强或通知(Advice)使用到了适配器模式、Spring MVC中也是用到了适配器模式适配Controller。 还有很多。。。。。。。 @Component和@Bean的区别是什么 1.作用对象不同。@Component注解作用于类,而@Bean注解作用于方法。 2.@Component注解通常是通过类路径扫描来自动侦测以及自动装配到Spring容器中(我们可以使用@ComponentScan注解定义要扫描的路径)。@Bean注解通常是在标有该注解的方法中定义产生这个bean,告诉Spring这是某个类的实例,当我需要用它的时候还给我。 3.@Bean注解比@Component注解的自定义性更强,而且很多地方只能通过@Bean注解来注册bean。比如当引用第三方库的类需要装配到Spring容器的时候,就只能通过@Bean注解来实现。 @Configuration public class AppConfig { @Bean public TransferService transferService { return new TransferServiceImpl; } } <beans> <bean id="transferService" class="com.kk.TransferServiceImpl"/> </beans> @Bean public OneService getService(status) { case (status) { when 1: return new serviceImpl1; when 2: return new serviceImpl2; when 3: return new serviceImpl3; } } 将一个类声明为Spring的bean的注解有哪些? 声明bean的注解: @Component 组件,没有明确的角色 @Service 在业务逻辑层使用(service层) @Repository 在数据访问层使用(dao层) @Controller 在展现层使用,控制器的声明 注入bean的注解: @Autowired:由Spring提供 @Inject:由JSR-330提供 @Resource:由JSR-250提供 *扩:JSR 是 java 规范标准 Spring事务管理的方式有几种? 1.编程式事务:在代码中硬编码(不推荐使用)。 2.声明式事务:在配置文件中配置(推荐使用),分为基于XML的声明式事务和基于注解的声明式事务。 Spring事务中的隔离级别有哪几种? 在TransactionDefinition接口中定义了五个表示隔离级别的常量:ISOLATION_DEFAULT:使用后端数据库默认的隔离级别,Mysql默认采用的REPEATABLE_READ隔离级别;Oracle默认采用的READ_COMMITTED隔离级别。ISOLATION_READ_UNCOMMITTED:最低的隔离级别,允许读取尚未提交的数据变更,可能会导致脏读、幻读或不可重复读。ISOLATION_READ_COMMITTED:允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生ISOLATION_REPEATABLE_READ:对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,可以阻止脏读和不可重复读,但幻读仍有可能发生。ISOLATION_SERIALIZABLE:最高的隔离级别,完全服从ACID的隔离级别。所有的事务依次逐个执行,这样事务之间就完全不可能产生干扰,也就是说,该级别可以防止脏读、不可重复读以及幻读。但是这将严重影响程序的性能。通常情况下也不会用到该级别。 Spring事务中有哪几种事务传播行为? 在TransactionDefinition接口中定义了八个表示事务传播行为的常量。 支持当前事务的情况:PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。PROPAGATION_SUPPORTS: 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。PROPAGATION_MANDATORY: 如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。(mandatory:强制性)。 不支持当前事务的情况:PROPAGATION_REQUIRES_NEW: 创建一个新的事务,如果当前存在事务,则把当前事务挂起。PROPAGATION_NOT_SUPPORTED: 以非事务方式运行,如果当前存在事务,则把当前事务挂起。PROPAGATION_NEVER: 以非事务方式运行,如果当前存在事务,则抛出异常。 其他情况:PROPAGATION_NESTED: 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于PROPAGATION_REQUIRED。 二、SpringMVC篇 什么是Spring MVC ?简单介绍下你对springMVC的理解? Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架,通过把Model,View,Controller分离,将web层进行职责解耦,把复杂的web应用分成逻辑清晰的几部分,简化开发,减少出错,方便组内开发人员之间的配合。 Spring MVC的工作原理了解嘛? image.png Springmvc的优点: (1)可以支持各种视图技术,而不仅仅局限于JSP; (2)与Spring框架集成(如IoC容器、AOP等); (3)清晰的角色分配:前端控制器(dispatcherServlet) , 请求到处理器映射(handlerMapping), 处理器适配器(HandlerAdapter), 视图解析器(ViewResolver)。 (4) 支持各种请求资源的映射策略。 Spring MVC的主要组件? (1)前端控制器 DispatcherServlet(不需要程序员开发) 作用:接收请求、响应结果,相当于转发器,有了DispatcherServlet 就减少了其它组件之间的耦合度。 (2)处理器映射器HandlerMapping(不需要程序员开发) 作用:根据请求的URL来查找Handler (3)处理器适配器HandlerAdapter 注意:在编写Handler的时候要按照HandlerAdapter要求的规则去编写,这样适配器HandlerAdapter才可以正确的去执行Handler。 (4)处理器Handler(需要程序员开发) (5)视图解析器 ViewResolver(不需要程序员开发) 作用:进行视图的解析,根据视图逻辑名解析成真正的视图(view) (6)视图View(需要程序员开发jsp) View是一个接口, 它的实现类支持不同的视图类型(jsp,freemarker,pdf等等) springMVC和struts2的区别有哪些? (1)springmvc的入口是一个servlet即前端控制器(DispatchServlet),而struts2入口是一个filter过虑器(StrutsPrepareAndExecuteFilter)。 (2)springmvc是基于方法开发(一个url对应一个方法),请求参数传递到方法的形参,可以设计为单例或多例(建议单例),struts2是基于类开发,传递参数是通过类的属性,只能设计为多例。 (3)Struts采用值栈存储请求和响应的数据,通过OGNL存取数据,springmvc通过参数解析器是将request请求内容解析,并给方法形参赋值,将数据和视图封装成ModelAndView对象,最后又将ModelAndView中的模型数据通过reques域传输到页面。Jsp视图解析器默认使用jstl。 SpringMVC怎么样设定重定向和转发的? (1)转发:在返回值前面加"forward:",譬如"forward:user.do?name=method4" (2)重定向:在返回值前面加"redirect:",譬如"redirect:http://www.baidu.com" SpringMvc怎么和AJAX相互调用的? 通过Jackson框架就可以把Java里面的对象直接转化成Js可以识别的Json对象。具体步骤如下 : (1)加入Jackson.jar (2)在配置文件中配置json的映射 (3)在接受Ajax方法里面可以直接返回Object,List等,但方法前面要加上@ResponseBody注解。 如何解决POST请求中文乱码问题,GET的又如何处理呢? (1)解决post请求乱码问题: 在web.xml中配置一个CharacterEncodingFilter过滤器,设置成utf-8; <filter> <filter-name>CharacterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>utf-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>CharacterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> (2)get请求中文参数出现乱码解决方法有两个: ①修改tomcat配置文件添加编码与工程编码一致,如下: <ConnectorURIEncoding="utf-8" connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/> ②另外一种方法对参数进行重新编码: String userName = new String(request.getParamter("userName").getBytes("ISO8859-1"),"utf-8") ISO8859-1是tomcat默认编码,需要将tomcat编码后的内容按utf-8编码。 Spring MVC的异常处理 ? 统一异常处理: Spring MVC处理异常有3种方式: (1)使用Spring MVC提供的简单异常处理器SimpleMappingExceptionResolver; (2)实现Spring的异常处理接口HandlerExceptionResolver 自定义自己的异常处理器; (3)使用@ExceptionHandler注解实现异常处理; 统一异常处理的博客:https://blog.csdn.net/ctwy291314/article/details/81983103 SpringMVC的控制器是不是单例模式,如果是,有什么问题,怎么解决? 是单例模式,所以在多线程访问的时候有线程安全问题,不要用同步,会影响性能的,解决方案是在控制器里面不能写成员变量。(此题目类似于上面Spring 中 第5题 有两种解决方案) SpringMVC常用的注解有哪些? @RequestMapping:用于处理请求 url 映射的注解,可用于类或方法上。用于类上,则表示类中的所有响应请求的方法都是以该地址作为父路径。 @RequestBody:注解实现接收http请求的json数据,将json转换为java对象。 @ResponseBody:注解实现将conreoller方法返回对象转化为json对象响应给客户。 SpingMvc中的控制器的注解一般用那个,有没有别的注解可以替代? 一般用@Controller注解,也可以使用@RestController,@RestController注解相当于@ResponseBody + @Controller,表示是表现层,除此之外,一般不用别的注解代替。 如果在拦截请求中,我想拦截get方式提交的方法,怎么配置? 可以在@RequestMapping注解里面加上method=RequestMethod.GET。 怎样在方法里面得到Request,或者Session? 直接在方法的形参中声明request,SpringMVC就自动把request对象传入。 如果想在拦截的方法里面得到从前台传入的参数,怎么得到? 直接在形参里面声明这个参数就可以,但必须名字和传过来的参数一样。 如果前台有很多个参数传入,并且这些参数都是一个对象的,那么怎么样快速得到这个对象? 直接在方法中声明这个对象,SpringMVC就自动会把属性赋值到这个对象里面。 SpringMVC中函数的返回值是什么? 返回值可以有很多类型,有String, ModelAndView。ModelAndView类把视图和数据都合并的一起的。 SpringMVC用什么对象从后台向前台传递数据的? 通过ModelMap对象,可以在这个对象里面调用put方法,把对象加到里面,前台就可以拿到数据。 怎么样把ModelMap里面的数据放入Session里面? 可以在类上面加上@SessionAttributes注解,里面包含的字符串就是要放入session里面的key。 SpringMvc里面拦截器是怎么写的: 有两种写法,一种是实现HandlerInterceptor接口,另外一种是继承适配器类,接着在接口方法当中,实现处理逻辑;然后在SpringMvc的配置文件中配置拦截器即可: <!-- 配置SpringMvc的拦截器 --> <mvc:interceptors> <!-- 配置一个拦截器的Bean就可以了 默认是对所有请求都拦截 --> <bean id="myInterceptor" class="com.zwp.action.MyHandlerInterceptor"></bean> <!-- 只针对部分请求拦截 --> <mvc:interceptor> <mvc:mapping path="/modelMap.do" /> <bean class="com.zwp.action.MyHandlerInterceptorAdapter" /> </mvc:interceptor> </mvc:interceptors> 注解原理: 注解本质是一个继承了Annotation的特殊接口,其具体实现类是Java运行时生成的动态代理类。我们通过反射获取注解时,返回的是Java运行时生成的动态代理对象。通过代理对象调用自定义注解的方法,会最终调用AnnotationInvocationHandler的invoke方法。该方法会从memberValues这个Map中索引出对应的值。而memberValues的来源是Java常量池 三、Mybatis篇 什么是MyBatis? MyBatis是一个可以自定义SQL、存储过程和高级映射的持久层框架。 讲下MyBatis的缓存 MyBatis的缓存分为一级缓存和二级缓存,一级缓存放在session里面,默认就有, 二级缓存放在它的命名空间里,默认是不打开的,使用二级缓存属性类需要实现Serializable序列化接口, 可在它的映射文件中配置<cache/> Mybatis是如何进行分页的?分页插件的原理是什么? 1)Mybatis使用RowBounds对象进行分页,也可以直接编写sql实现分页,也可以使用Mybatis的分页插件。 2)分页插件的原理:实现Mybatis提供的接口,实现自定义插件,在插件的拦截方法内拦截待执行的sql,然后重写sql。 举例:select * from student,拦截sql后重写为:select t.* from (select * from student)t limit 0,10 简述Mybatis的插件运行原理,以及如何编写一个插件? 1)Mybatis仅可以编写针对ParameterHandler、ResultSetHandler、StatementHandler、 Executor这4种接口的插件,Mybatis通过动态代理, 为需要拦截的接口生成代理对象以实现接口方法拦截功能, 每当执行这4种接口对象的方法时,就会进入拦截方法, 具体就是InvocationHandler的invoke方法,当然, 只会拦截那些你指定需要拦截的方法。 2)实现Mybatis的Interceptor接口并复写intercept方法, 然后在给插件编写注解,指定要拦截哪一个接口的哪些方法即可, 记住,别忘了在配置文件中配置你编写的插件。 Mybatis动态sql是做什么的?都有哪些动态sql?能简述一下动态sql的执行原理不? 1)Mybatis动态sql可以让我们在Xml映射文件内, 以标签的形式编写动态sql,完成逻辑判断和动态拼接sql的功能。 2)Mybatis提供了9种动态sql标签:trim|where|set|foreach|if|choose|when|otherwise|bind。 3)其执行原理为,使用OGNL从sql参数对象中计算表达式的值, 根据表达式的值动态拼接sql,以此来完成动态sql的功能。 #{}和${}的区别是什么? 1)#{}是预编译处理,${}是字符串替换。 2)Mybatis在处理#{}时,会将sql中的#{}替换为?号,调用PreparedStatement的set方法来赋值(有效的防止SQL注入); 3)Mybatis在处理${}时,就是把${}替换成变量的值。 为什么说Mybatis是半自动ORM映射工具?它与全自动的区别在哪里? Hibernate属于全自动ORM映射工具, 使用Hibernate查询关联对象或者关联集合对象时, 可以根据对象关系模型直接获取,所以它是全自动的。 而Mybatis在查询关联对象或关联集合对象时, 需要手动编写sql来完成,所以,称之为半自动ORM映射工具。 Mybatis是否支持延迟加载?如果支持,它的实现原理是什么? 1)Mybatis仅支持association关联对象和collection关联集合对象的延迟加载, association指的就是一对一,collection指的就是一对多查询。 在Mybatis配置文件中, 可以配置是否启用延迟加载lazyLoadingEnabled=true|false。 2)它的原理是,使用CGLIB创建目标对象的代理对象, 当调用目标方法时,进入拦截器方法, 比如调用a.getB.getName, 拦截器invoke方法发现a.getB是null值, 那么就会单独发送事先保存好的查询关联B对象的sql, 把B查询上来,然后调用a.setB(b), 于是a的对象b属性就有值了, 接着完成a.getB.getName方法的调用。 这就是延迟加载的基本原理。 MyBatis与Hibernate有哪些不同? 1)Mybatis和hibernate不同,它不完全是一个ORM框架, 因为MyBatis需要程序员自己编写Sql语句, 不过mybatis可以通过XML或注解方式灵活配置要运行的sql语句, 并将java对象和sql语句映射生成最终执行的sql, 最后将sql执行的结果再映射生成java对象。 2)Mybatis学习门槛低,简单易学,程序员直接编写原生态sql, 可严格控制sql执行性能,灵活度高,非常适合对关系数据模型要求不高的软件开发, 例如互联网软件、企业运营类软件等,因为这类软件需求变化频繁, 一但需求变化要求成果输出迅速。但是灵活的前提是mybatis无法做到数据库无关性, 如果需要实现支持多种数据库的软件则需要自定义多套sql映射文件,工作量大。 3)Hibernate对象/关系映射能力强,数据库无关性好, 对于关系模型要求高的软件(例如需求固定的定制化软件) 如果用hibernate开发可以节省很多代码,提高效率。 但是Hibernate的缺点是学习门槛高,要精通门槛更高, 而且怎么设计O/R映射,在性能和对象模型之间如何权衡, 以及怎样用好Hibernate需要具有很强的经验和能力才行。 总之,按照用户的需求在有限的资源环境下只要能做出维护性、 扩展性良好的软件架构都是好架构,所以框架只有适合才是最好。 MyBatis的好处是什么? 1)MyBatis把sql语句从Java源程序中独立出来,放在单独的XML文件中编写, 给程序的维护带来了很大便利。 2)MyBatis封装了底层JDBC API的调用细节,并能自动将结果集转换成Java Bean对象, 大大简化了Java数据库编程的重复工作。 3)因为MyBatis需要程序员自己去编写sql语句, 程序员可以结合数据库自身的特点灵活控制sql语句, 因此能够实现比Hibernate等全自动orm框架更高的查询效率,能够完成复杂查询。 简述Mybatis的Xml映射文件和Mybatis内部数据结构之间的映射关系? Mybatis将所有Xml配置信息都封装到All-In-One重量级对象Configuration内部。 在Xml映射文件中,<parameterMap>标签会被解析为ParameterMap对象, 其每个子元素会被解析为ParameterMapping对象。 <resultMap>标签会被解析为ResultMap对象, 其每个子元素会被解析为ResultMapping对象。 每一个<select>、<insert>、<update>、<delete> 标签均会被解析为MappedStatement对象, 标签内的sql会被解析为BoundSql对象。 什么是MyBatis的接口绑定,有什么好处? 接口映射就是在MyBatis中任意定义接口,然后把接口里面的方法和SQL语句绑定, 我们直接调用接口方法就可以,这样比起原来了SqlSession提供的方法我们可以有更加灵活的选择和设置. 接口绑定有几种实现方式,分别是怎么实现的? 接口绑定有两种实现方式,一种是通过注解绑定,就是在接口的方法上面加 上@Select@Update等注解里面包含Sql语句来绑定, 另外一种就是通过xml里面写SQL来绑定,在这种情况下, 要指定xml映射文件里面的namespace必须为接口的全路径名. 什么情况下用注解绑定,什么情况下用xml绑定? 当Sql语句比较简单时候,用注解绑定;当SQL语句比较复杂时候,用xml绑定,一般用xml绑定的比较多 MyBatis实现一对一有几种方式?具体怎么操作的? 有联合查询和嵌套查询,联合查询是几个表联合查询,只查询一次, 通过在resultMap里面配置association节点配置一对一的类就可以完成; 嵌套查询是先查一个表,根据这个表里面的结果的外键id, 去再另外一个表里面查询数据,也是通过association配置, 但另外一个表的查询通过select属性配置。 Mybatis能执行一对一、一对多的关联查询吗?都有哪些实现方式,以及它们之间的区别? 能,Mybatis不仅可以执行一对一、一对多的关联查询, 还可以执行多对一,多对多的关联查询,多对一查询, 其实就是一对一查询,只需要把selectOne修改为selectList即可; 多对多查询,其实就是一对多查询,只需要把selectOne修改为selectList即可。 关联对象查询,有两种实现方式,一种是单独发送一个sql去查询关联对象, 赋给主对象,然后返回主对象。另一种是使用嵌套查询,嵌套查询的含义为使用join查询, 一部分列是A对象的属性值,另外一部分列是关联对象B的属性值, 好处是只发一个sql查询,就可以把主对象和其关联对象查出来。 MyBatis里面的动态Sql是怎么设定的?用什么语法? MyBatis里面的动态Sql一般是通过if节点来实现,通过OGNL语法来实现, 但是如果要写的完整,必须配合where,trim节点,where节点是判断包含节点有 内容就插入where,否则不插入,trim节点是用来判断如果动态语句是以and 或or 开始,那么会自动把这个and或者or取掉。 Mybatis是如何将sql执行结果封装为目标对象并返回的?都有哪些映射形式? 第一种是使用<resultMap>标签,逐一定义列名和对象属性名之间的映射关系。 第二种是使用sql列的别名功能,将列别名书写为对象属性名, 比如T_NAME AS NAME,对象属性名一般是name,小写, 但是列名不区分大小写,Mybatis会忽略列名大小写,
-
基于 Flink SQL CDC 的实时数据同步解决方案
-
Flink CDC 向多里斯同步数据
-
实时音频和视频技术的发展与应用-1.1 双重音频和视频 从架构上看,双人音视频系统相对简单明了。红点代表房间信令服务,房间信令服务的主要功能是管理房间信息,实现容量协商和上下行链路的质量调节,例如当下行信道发生拥塞时,上行线路的码率和分辨率会降低。 在传输信道层面,我们的策略是优先直连,在跨区域、跨运营商的情况下,我们会选择单中转或双中转信道,在策略上尽量保持直连和中转信道同时存在,当其中一个信道的质量不好时,系统会自动切断到另一个信道的流量。 1.2 多人音视频 多人视频通话的产品形态是整个房间不超过 50 人,大盘平均房间规模约为 4.x 人,房间内部最多满足一个大视频和三个小视频(四屏)。根据这一条件,我们在架构中采用了典型的 SFU 小房间设计。 上图中的红点代表房间信令服务,主要用于房间管理和状态信息同步。房间管理主要包括用户列表的管理,例如哪些用户打开了视频/音频,我看了谁,谁看了我,这些都是基于房间管理的信息,然后房间信令服务会将这些信息同步到媒体传输服务进行数据分发。 房间服务的另一个作用是房间级容量协商和质量控制,例如,房间里的每个人一开始都支持 H.265 编码,当某个时刻进来一个只支持 H.264 编码的用户时,房间里所有的上游主播就必须把 H.265 切成 H.264。还有一种情况是,房间里有一定比例的人下行链路信道质量较差,这会导致上行链路房间质量下降。 在传输层面,我们采用的是单层分布式媒体传输网络,大家都选择中转方式,不区分双人和多人,采用 Full-Mesh 传输机制将所有数据推送过去,比如一个节点上的人并不都看另外两个人的视频,但还是会将视频推送给他们。
-
Flink CDC 3.0 发布,新一代实时数据集成框架详解
-
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC
-
为何我会选择参与开源,缘起于实现MySQL CDC数据同步的经历
-
快速实现 SQL Server 至 MySQL 的数据搬家与实时同步,5分钟轻松完成