欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

如何同步实现数据一致性和基于日志的实时提取?

最编程 2024-06-16 18:24:02
...

本文根据DBAplus社群第85期线上分享整理而成

 

讲师介绍 20161219102547736.jpg

王东

宜信技术研发中心架构师

 

  • 目前就职于宜信技术研发中心,任架构师,负责流式计算和大数据业务产品解决方案。

  • 曾任职于Naver china(韩国最大搜索引擎公司)中国研发中心资深工程师,多年从事CUBRID分布式数据库集群开发和CUBRID数据库引擎开发

    http://www.cubrid.org/blog/news/cubrid-cluster-introduction/

 

主题简介:

  1. DWS 的背景介绍

  2. dbus+wormhole 总体架构和技术实现方案

  3. DWS的实际运用案例

 

前言  

 

大家好,我是王东,来自宜信技术研发中心,这是我来社群的第一次分享,如果有什么不足,请大家多多指正、包涵。

 

本次分享的主题是《基于日志的DWS平台实现和应用》, 主要是分享一下目前我们在宜信做的一些事情。这个主题里面包含到2个团队很多兄弟姐妹的努力的结果(我们团队和山巍团队的成果)。这次就由我代为执笔,尽我努力给大家介绍一下。

 

其实整个实现从原理上来说是比较简单的,当然也涉及到不少技术。我会尝试用尽量简单的方式来表达,让大家了解这个事情的原理和意义。在过程中,大家有问题可以随时提出,我会尽力去解答。

 

DWS是一个简称,是由3个子项目组成,我稍后做解释。

 

一、背景

 

事情是从公司前段时间的需求说起,大家知道宜信是一个互联网金融企业,我们的很多数据与标准互联网企业不同,大致来说就是:

 

20161219102612940.png

 

玩数据的人都知道数据是非常有价值的,然后这些数据是保存在各个系统的数据库中,如何让需要数据的使用方得到一致性、实时的数据呢?

 

过去的通用做法有几种是:

  1. DBA开放各个系统的备库,在业务低峰期(比如夜间),使用方各自抽取所需数据。由于抽取时间不同,各个数据使用方数据不一致,数据发生冲突,而且重复抽取,相信不少DBA很头疼这个事情。

  2. 公司统一的大数据平台,通过Sqoop 在业务低峰期到各个系统统一抽取数据, 并保存到Hive表中, 然后为其他数据使用方提供数据服务。这种做法解决了一致性问题,但时效性差,基本是T+1的时效。

  3. 基于trigger的方式获取增量变更,主要问题是业务方侵入性大,而且trigger也带来性能损失。

 

这些方案都不算完美。我们在了解和考虑了不同实现方式后,最后借鉴了 linkedin的思想,认为要想同时解决数据一致性和实时性,比较合理的方法应该是来自于log。

 

20161219102623736.jpg
此图来自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)

 

把增量的Log作为一切系统的基础。后续的数据使用方,通过订阅kafka来消费log。

 

比如:

  • 大数据的使用方可以将数据保存到Hive表或者Parquet文件给Hive或Spark查询;

  • 提供搜索服务的使用方可以保存到Elasticsearch或HBase 中;

  • 提供缓存服务的使用方可以将日志缓存到Redis或alluxio中;

  • 数据同步的使用方可以将数据保存到自己的数据库中;

  • 由于kafka的日志是可以重复消费的,并且缓存一段时间,各个使用方可以通过消费kafka的日志来达到既能保持与数据库的一致性,也能保证实时性;

 

为什么使用log和kafka作为基础,而不使用Sqoop进行抽取呢? 因为:

 

20161219102639770.png

 

为什么不使用dual write(双写)呢?,请参考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

 

我这里就不多做解释了。

 

二、总体架构

 

于是我们提出了构建一个基于log的公司级的平台的想法。

 

下面解释一下DWS平台, DWS平台是有3个子项目组成:

  1. Dbus(数据总线):负责实时将数据从源端实时抽出,并转换为约定的自带schema的json格式数据(UMS 数据),放入kafka中;

  2. Wormhole(数据交换平台):负责从kafka读出数据 将数据写入到目标中;

  3. Swifts(实时计算平台):负责从kafka中读出数据,实时计算,并将数据写回kafka中。

 

20161219102659674.jpg

 

图中:

  • Log extractor和dbus共同完成数据抽取和数据转换,抽取包括全量和增量抽取。

  • Wormhole可以将所有日志数据保存到HDFS中; 还可以将数据落地到所有支持jdbc的数据库,落地到HBash,Elasticsearch,Cassandra等;

  • Swifts支持以配置和SQL的方式实现对进行流式计算,包括支持流式join,look up,filter,window aggregation等功能;

  • Dbus web是dbus的配置管理端,rider除了配置管理以外,还包括对Wormhole和Swifts运行时管理,数据质量校验等。

 

由于时间关系,我今天主要介绍DWS中的Dbus和Wormhole,在需要的时候附带介绍一下Swifts。

 

三、dbus解决方案

 

日志解析

 

如前面所说,Dbus主要解决的是将日志从源端实时的抽出。 这里我们以MySQL为例子,简单说明如何实现。

 

我们知道,虽然MySQL InnoDB有自己的log,MySQL主备同步是通过binlog来实现的。如下图:

 

20161219102717279.jpg
图片来自:https://github.com/alibaba/canal

 

而binlog有三种模式:

  1. Row 模式:日志中会记录成每一行数据被修改的形式,然后在slave端再对相同的数据进行修改。

  2. Statement 模式: 每一条会修改数据的sql都会记录到 master的bin-log中。slave在复制的时候SQL进程会解析成和原来master端执行过的相同的SQL来再次执行。

  3. Mixed模式: MySQL会根据执行的每一条具体的sql语句来区分对待记录的日志形式,也就是在Statement和Row之间选择一种。

 

他们各自的优缺点如下:

 

20161219102731251.jpg
此处来自:http://www.jquerycn.cn/a_13625

 

由于statement 模式的缺点,在与我们的DBA沟通过程中了解到,实际生产过程中都使用row 模式进行复制。这使得读取全量日志成为可能。

 

通常我们的MySQL布局是采用 2个master主库(vip)+ 1个slave从库 + 1个backup容灾库 的解决方案,由于容灾库通常是用于异地容灾,实时性不高也不便于部署。

 

为了最小化对源端产生影响,显然我们读取binlog日志应该从slave从库读取。

读取binlog的方案比较多,github上不少,参考https://github.com/search?utf8=%E2%9C%93&q=binlog。最终我们选用了阿里的canal做位日志抽取方。

 

Canal最早被用于阿里中美机房同步, canal原理相对比较简单:

  1. Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Slave发送dump协议

  2. MySQL master收到dump请求,开始推送binary log给Slave(也就是canal)

  3. Canal解析binary log对象(原始为byte流)

 

20161219102749773.jpg
图片来自:https://github.com/alibaba/canal

 

解决方案

 

Dbus 的MySQL版主要解决方案如下:

 

20161219102800264.jpg

 

对于增量的log,通过订阅Canal Server的方式,我们得到了MySQL的增量日志:

 

 

  • 按照Canal的输出,日志是protobuf格式,开发增量Storm程序,将数据实时转换为我们定义的UMS格式(json格式,稍后我会介绍),并保存到kafka中;

  • 增量Storm程序还负责捕获schema变化,以控制版本号;

  • 增量Storm的配置信息保存在Zookeeper中,以满足高可用需求。

  • Kafka既作为输出结果也作为处理过程中的缓冲器和消息解构区。

 

在考虑使用Storm作为解决方案的时候,我们主要是认为Storm有以下优点:

  • 技术相对成熟,比较稳定,与kafka搭配也算标准组合;

  • 实时性比较高,能够满足实时性需求;

  • 满足高可用需求;

  • 通过配置Storm并发度,可以活动性能扩展的能力;

 

全量抽取

 

对于流水表,有增量部分就够了,但是许多表需要知道最初(已存在)的信息。这时候我们需要initial load(第一次加载)。

 

对于initial load(第一次加载),同样开发了全量抽取Storm程序通过jdbc连接的方式,从源端数据库的备库进行拉取。initial load是拉全部数据,所以我们推荐在业务低峰期进行。好在只做一次,不需要每天都做。

 

全量抽取,我们借鉴了Sqoop的思想。将全量抽取Storm分为了2 个部分:

  1. 数据分片

  2. 实际抽取

 

数据分片需要考虑分片列,按照配置和自动选择列将数据按照范围来分片,并将分片信息保存到kafka中。

 

20161219102828533.jpg

 

下面是具体的分片策略:

 

20161219102841601.jpg

 

全量抽取的Storm程序是读取kafka的分片信息,采用多个并发度并行连接数据库备库进行拉取。因为抽取的时间可能很长。抽取过程中将实时状态写到Zookeeper中,便于心跳程序监控。

 

20161219102921349.jpg

 

统一消息格式

 

无论是增量还是全量,最终输出到kafka中的消息都是我们约定的一个统一消息格式,称为UMS(unified message schema)格式。

 

如下图所示:

 

20161219102945374.png

20161219103014471.png

 

消息中schema部分,定义了namespace 是由 类型+数据源名+schema名+表名+版本号+分库号+分表号 能够描述整个公司的所有表,通过一个namespace就能唯一定位。

 

  • _ums_op_ 表明数据的类型是I(insert),U(update),D(删除);

  • _ums_ts_ 发生增删改的事件的时间戳,显然新的数据发生的时间戳更新;

  • _ums_id_ 消息的唯一id,保证消息是唯一的,但这里我们保证了消息的先后顺序(稍后解释);

 

payload是指具体的数据,一个json包里面可以包含1条至多条数据,提高数据的有效载荷。

 

UMS中支持的数据类型,参考了Hive类型并进行简化,基本上包含了所有数据类型。

 

全量和增量的一致性

 

在整个数据传输中,为了尽量的保证日志消息的顺序性,kafka我们使用的是1个partition的方式。在一般情况下,基本上是顺序的和唯一的。

 

但是我们知道写kafka会失败,有可能重写,Storm也用重做机制,因此,我们并不严格保证exactly once和完全的顺序性,但保证的是at least once。

因此_ums_id_变得尤为重要。

 

对于全量抽取,_ums_id_是唯一的,从zk中每个并发度分别取不同的id片区,保证了唯一性和性能,填写负数,不会与增量数据冲突,也保证他们是早于增量消息的。

 

对于增量抽取,我们使用的是MySQL的日志文件号 + 日志偏移量作为唯一id。Id作为64位的long整数,高7位用于日志文件号,低12位作为日志偏移量。

例如:000103000012345678。 103 是日志文件号,12345678 是日志偏移量。

 

这样,从日志层面保证了物理唯一性(即便重做也这个id号也不变),同时也保证了顺序性(还能定位日志)。通过比较_ums_id_ 消费日志就能通过比较_ums_id_知道哪条消息更新。

 

其实_ums_ts_与_ums_id_意图是类似的,只不过有时候_ums_ts_可能会重复,即在1毫秒中发生了多个操作,这样就得靠比较_ums_id_了。

 

心跳监控和预警

 

整个系统涉及到数据库的主备同步,Canal Server,多个并发度Storm进程等各个环节。

 

因此对流程的监控和预警就尤为重要。

 

通过心跳模块,例如每分钟(可配置)对每个被抽取的表插入一条心态数据并保存发送时间,这个心跳表也被抽取,跟随着整个流程下来,与被同步表在实际上走相同的逻辑(因为多个并发的的Storm可能有不同的分支),当收到心跳包的时候,即便没有任何增删改的数据,也能证明整条链路是通的。

 

Storm程序和心跳程序将数据发送公共的统计topic,再由统计程序保存到influxdb中,使用grafana进行展示,就可以看到如下效果:

 

20161219103050610.jpg

 

图中是某业务系统的实时监控信息。上面是实时流量情况,下面是实时延时情况。可以看到,实时性还是很不错的,基本上1~2秒数据就已经到末端kafka中。

 

Granfana提供的是一种实时监控能力。

 

如果出现延时,则是通过dbus的心跳模块发送邮件报警或短信报警。

 

实时脱敏

 

考虑到数据安全性,对于有脱敏需求的场景,Dbus的全量storm和增量storm程序也完成了实时脱敏的功能。脱敏方式有3种:

 

20161219103106947.png

 

总结一下:简单的说,Dbus就是将各种源的数据,实时的导出,并以UMS的方式提供订阅, 支持实时脱敏,实际监控和报警。

 

四、Wormhole解决方案

 

说完Dbus,该说一下Wormhole,为什么两个项目不是一个,而要通过kafka来对接呢?

 

其中很大一个原因就是解耦,kafka具有天然的解耦能力,程序直接可以通过kafka做异步的消息传递。Dbus和Wornhole内部也使用了kafka做消息传递和解耦。

 

另外一个原因就是,UMS是自描述的,通过订阅kafka,任何有能力的使用方来直接消费UMS来使用。

 

虽然UMS的结果可以直接订阅,但还需要开发的工作。Wormhole解决的是:提供一键式的配置,将kafka中的数据落地到各种系统中,让没有开发能力的数据使用方通过wormhole来实现使用数据。

 

20161219103125658.jpg

 

如图所示,Wormhole 可以将kafka中的UMS 落地到各种系统,目前用的最多的HDFS,JDBC的数据库和HBase。

 

在技术栈上, wormhole选择使用spark streaming来进行。

 

在Wormhole中,一条flow是指从一个namaspace从源端到目标端。一个spark streaming服务于多条flow。

 

20161219103201131.jpg

 

选用Spark的理由是很充分的:

  • Spark天然的支持各种异构存储系统;

  • 虽然Spark Stream比Storm延时稍差,但Spark有着更好的吞吐量和更好的计算性能;

  • Spark在支持并行计算方面有更强的灵活性;

  • Spark提供了一个技术栈内解决Sparking Job,Spark Streaming,Spark SQL的统一功能,便于后期开发;

 

这里补充说一下Swifts的作用:

  • Swifts的本质是读取kafka中的UMS数据,进行实时计算,将结果写入到kafka的另外一个topic。

  • 实时计算可以是很多种方式:比如过滤filter,projection(投影),lookup, 流式join window aggregation,可以完成各种具有业务价值的流式实时计算。

 

Wormhole和Swifts对比如下:

 

20161219103223476.jpg

 

落HDFS

 

通过Wormhole Wpark Streaming程序消费kafka的UMS,首先UMS log可以被保存到HDFS上。

 

kafka一般只保存若干天的信息,不会保存全部信息,而HDFS中可以保存所有的历史增删改的信息。这就使得很多事情变为可能:

  • 通过重放HDFS中的日志,我们能够还原任意时间的历史快照。

  • 可以做拉链表,还原每一条记录的历史信息,便于分析;

  • 当程序出现错误是,可以通过回灌(backfill),重新消费消息,重新形成新的快照。

 

可以说HDFS中的日志是很多的事情基础。

 

介于Spark原生对parquet支持的很好,Spark SQL能够对Parquet提供很好的查询。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的内容是所有log的增删改信息以及_ums_id_,_ums_ts_都存下来。

 

Wormhole spark streaming根据namespace 将数据分布存储到不同的目录中,即不同的表和版本放在不同目录中。

 

20161219103248892.png

 

由于每次写的Parquet都是小文件,大家知道HDFS对于小文件性能并不好,因此另外还有一个job,每天定时将这些的Parquet文件进行合并成大文件。

每个Parquet文件目录都带有文件数据的起始时间和结束时间。这样在回灌数据时,可以根据选取的时间范围来决定需要读取哪些Parquet文件,不必读取全部数据。

 

插入或更新数据的幂

推荐阅读