欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

蜂巢优化实践

最编程 2024-05-04 19:50:00
...

1、前言

对于流行的分布式计算框架(如离线的MapReduce、流计算Storm、迭代内存计算Spark、流式计算Flink),“数据量大”从来都不是问题,因为理论上来说,都可以通过增加并发的节点数来解决。

但是如果数据倾斜或者分布不均匀了,那么就会是问题。此时不能简单地通过增加并发节点数来解决问题,而必须采用针对性的措施和优化方案来解决。

这也正是本文讨论的主要内容。实际上,Hive SQL的各种优化方案基本都和数据倾斜密切先关,因此本章首先介绍“数据倾斜”的基本概念,然后在此基础上仔细介绍各种场景下的Hive优化方案。

Hive的优化分为join相关的join优化和join无关的优化,从项目实际上来说,join相关的优化占了Hive大部分内容,而join相关的优化又分为mapjoin可以解决的join优化和mapjoin无法解决的join优化。本文将逐一详细介绍其原理及优化方案

2、离线数据处理的主要挑战:数据倾斜

在进入具体的Hive各个场景优化之前,首先介绍“数据倾斜”的概念

实际上,并没有专门针对数据倾斜给出的一个理论定义。“倾斜”应该来自于统计学里的偏态分布。所谓偏态分布,即统计数据峰值与平均值不相等的频率分布,根据峰值小于或大于平均值可分为正篇函数和负偏函数,其偏离的程度可用偏态系数刻画。数据处理中的倾斜和次相关,但是含义有着很多不同。下面介绍数据处理中的数据倾斜。

对于分布式数据处理来说,我们希望数据平均分布到每个处理节点。如果以每个处理节点为X轴,每个节点处理的数据为Y轴,我们希望的柱状图是下图的样式:

但是实际上由于业务数据本身的问题或者分布算法的问题,每个节点分配到的数据量很可能是下图所示的样式:

更极端情况下还可能是下图所示的形式:

也就是说,只有待分到最多数据的节点处理完数据,整个数据处理任务才能完成,此时分布式的意义就大大折扣。实际上,即使每个节点分配到的数据量大致相同,数据仍然可能倾斜,比如考虑统计词频的极端问题,如果某个节点分配到的次都是一个词,那么显然此节点需要的耗时将很长,即使其数据量和其他节点的数据量相同。

总结:Hive的数据倾斜就是MapReduce过程中由于Shuffle过程导致的数据(key)分布不均匀

Hive SQL中如何排查数据倾斜问题?

  • 某个reduce task,卡在99.9%半天不动。
  • Reduce处理的数据量巨大,在做full gc的时候,stop the world。导致响应超时,超出默认的600秒,任务被杀掉

Hive的优化正式采用各种措施和方法对上述场景的倾斜问题进行优化和处理。

3、开发时注意的问题

在实际Hive SQL开发的过程中,Hive SQL性能的问题上实际只有一小部分和数据倾斜。很多时候,Hive SQL运行得慢是由开发人员对于使用的数据了解不够以及一些不良的使用习惯引起的。

开发人员注意一下几点:

  • 需要计算的指标真的需要从数据仓库的共同明细层来自行汇总吗?是不是数据公共层团队开发的公共汇总层已经可以满足自己的需求?对于大众的、KPI相关的指标等通常涉及良好的数据仓库肯定已经包含了,直接使用即可。
  • 真的需要扫描这么多分区表吗?比如对于销售明细事物表来说,扫描一年的分区和扫描一周的分区所带来的计算、IO开销完全是两个量级,所耗费的时间肯定也是不同的。并不是说不能扫描一年的分区,而是希望开发人员需要仔细考虑业务需求,尽量不浪费计算和存储资源,毕竟大数据也不是毫无代价的。
  • 尽量不要使用select * from table这样方式,用到哪些列就指定哪些列,如select,col1,col2 from table。另外,where条件中也尽量添加过滤条件,以去掉无关的数据行,从而减少整个MapReduce任务重需要处理、分发的数据量。
  • 输入文件不要是大量的小文件。Hive 的默认Input Split是128MB(可配置),小文件可先合成大文件。

在保证了上述几点后,有的时候发现Hive SQL还是要运行很长时间,甚至运行不出来,这时就需要真正的Hive优化技术了。

下面逐一详细介绍各种场景下的Hive优化方法,但是开发人员需要了解自己的SQL,并根据执行过程中慢的环节来定位是何种问题,进而采用下述针对性解决方案。

4、具体优化

4.1、group by引起的倾斜优化

group by引起的倾斜主要是输入数据行按照group by列分布不均匀引起的,比如,假设统计2020年淘宝每个店铺的订单数量,那么大部分店铺的订单量显然非常多,而多数店铺的订单量就一般,由于group by的时候是按照店铺的ID分发到每个Reduce Task,那么此时分配到大店铺的Reduce Task就分配了更多的订单,从而导致数据倾斜。

对于group by引起的倾斜,优化措施非常简单,只需要设置下面参数即可:

set hive.map.aggr = true 
set hive.groupby.skewindata=true

此时Hive在数据倾斜的时候会进行负载均衡,生成的查询计划有两个MapReduce Job。第一个MapReduce Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作并输出结果,这样处理的结果是相同的group by key有可能被分布到不同的Reduce中,从而达到负载均衡的目的;第二个MapReduce Job再根据预处理的数据结果按照group by key分布到Reduce中(这个过程可以保证相同的group by key被分布到同一个Reduce中),最后完成最终的聚合操作。

4.2、count distinct优化

在Hive开发过程中,应该小心使用count distinct,因为很容易引起性能的问题,比如下面的SQL:

select count(distinct user_id) from user_table

由于必须去重,因此Hive将会把Map阶段的输出全部分布到一个Reduce Task上,此时很容易引起性能问题。对于这种情况,可以通过先group by再count的方式来优化,优化后的SQL如下:

select count(1) from
(
select user_id from user_table group by user_id
)a

其原理为:利用group by 去重,再统计group by的行数目

4.3、本地模式(fetch模式)

在我们执行Hive SQL的时候,一旦执行MapReduce,效率极低,这个时候可以把hive内部设置将一些语句不需要走MapReduce程序,数据直接从本地读取 由于hive.fetch.task.conversion这个参数有以下三个值

set hive.fetch.task.conversion=none

此时只有desc语句不走MapReduce

set hive.fetch.task.conversion=mimimal

此时desc和select * 、filter、limit不走MapReduce

set hive.fetch.task.conversion=more

此时desc和select语句、所有的过滤条件、limit不走MapReduce

4.4、并行执行

Hive会将一个查询转化成一个或者多个阶段,这样的阶段可以使MapReduce阶段、抽样阶段、合并阶段、limit阶段、或者Hive在执行过程中可能需要的其他阶段。默认情况下,Hive一次只执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段可以并行执行,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。

通过设置参数,就可以开启平行执行。不过,在共享集群中,需要注意下,如果job中并行执行的阶段增多,那么集群利用率就会增加

set hive.exec.paraller = true;

4.5、严格模式

Hive提供了一个严格模式,可以防止用户执行那些可能产生意想不到的不好的影响的查询。

通过设置属性 hive.mapred.mode值为strict可以禁止3中类型的查询

  • 其一,对于分区表,除非where语句中含有分区字段过滤条件来限制数据范围,否则不允许执。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表:
hive> select distinct(user_id) from hive_trace where user_id > 1;
FAILED: SemanticException Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please sethive.strict.checks.large.query to false and that hive.mapred.mode is not set to 'strict' to proceed. Note that if you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "hive_trace" Table "hive_trace"
  • 对于使用了order by语句的基础,要求必须使用limit语句,因为order by为了执行排序过程会将所有的结果数据分发到同一个reducer中进行处理,强制要求用户增加这个limit语句防止reducer额外执行很长一段时间:
hive> select user_id from hive_trace where event_month = '202010' order by user_id;
FAILED: SemanticException 1:69 Order by-s without limit are disabled for safety reasons. If you know what you are doing, please sethive.strict.checks.large.query to false and that hive.mapred.mode is not set to 'strict' to proceed. Note that if you may get errors or incorrect results if you make a mistake while using some of the unsafe features.. Error encountered near token 'user_id'
  • 其三,也就是最后一种情况,就是限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行join查询的时候不使用on语句而是使用where语句,这样关系型数据库的执行优化器可以高效的将where语句转换成那个on语句。不幸的是,Hive并不会执行这种优化,因此如果表足够大,那么这个查询就会出现不可控的情况:

4.6、调整MapTask和ReduceTask的个数

Hive通过查询划分成一个或多个MapReduce任务达到并行目的地。每个任务都可能具有多个mapper和reducer任务,其中至少一些是可以并行的。确定最佳的mapper个数和reducer个数取决于多个变量,例如输入的数据量大小以及对这些数据执行的操作类型等。

保持平衡性是有必要的。如果有太多的mapper和reducer任务,就会导致启动阶段、调度和运行job过程中产生过多的开销;而如果设置的数量太少,那么就可能没有充分的利用好集群的内在的并行性。

当执行的Hive具有reduce过程时,CLI控制台会打印出调优的reducer个数。下面我们来看一下包含有group by语句的例子,引种这种查询总是需要reduc过程的。与此相反,很多查询会转换成只需map阶段的任务; Hive是按照输入的数据量大小来确定reducer的个数。我们可以通过dfs -count命令来计算输入量大小,这个命令和Linux中的du -s命令类似;其可以计算指定目录下所有数据的总大小:

有些情况下查询的map阶段会产生比实际输入数据量要很多的数据。如果map阶段产生的数据量非常多,那么根据输入的数据量大小来确定reducer个数就显得有些少了。同样地,map阶段也可能会过滤掉输入数据集中的很大一部分的数据而这时可能需要少量的reducer就满足计算了。

一个快速的进行验证的方式就是将reducer个数设置为固定的值,而无需Hive来计算得到这个值,如果用户还记得的话,Hive的默认reducer个数应该是3.可以通过设置属性 mapred.reduce.tasks的值为不同的值来确定是使用较多还是较少的reducer来缩短执行时间。需要记住,受外部因素影响,像这样的标杆值十分复杂,例如其他用户并发执行job的情况。Hadoop需要消耗几秒时间来启动和调度map和reduce任务(task)。在进行性能测试的时候,要考虑到这些影响因子,特别是job比较小的时候。

当在共享集群上处理大任务时,为了控制资源利用情况,属性hive.exec.reducers.max显得非常重要。一个Hadoop集群可以提供的map和reduce资源个数是固定的。某个大job可能就会消耗完所有的资源,从而导致其他job无法运行。通过设置属性hive.exec.reducers.max可以阻止某个查询消耗太多的reducer资源。有必要将这个属性配置到$HIVE_HOME/conf/hive-site.xml 文件中。对这个属性值大小的一个建议计算公式如下:

(集群总reducer槽位个数 * 1.5) / (执行中查询的平均个数)

4.7、JVM重用

JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

Hadoop默认通常配置是常用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含成百上千个task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N此,N的值可以再Hadoop的mapred-site.xml文件(位于$HADOOP_HOME/conf目录下)中进行设置:

<property>
      <name>mapred.job.reuse.jvm.num.tasks</name>
      <value>10</value>
      <description>
       How many tasks so run per jvm. If set to -1,there is no limit.
      </description>
</property>

总结:让一个continuer可以运行多个task任务,从最根本上减少continuer的开启和销毁时间,提升程序的整体执行效率

4.8、单个MapReduce中多个group by

另一个特别的优化视图将查询中的多个group by操作组装到单个MapReduce任务中。如果想启动这个优化,那么需要一组常用的group by键:

<property>
      <name>hive.multigrouphy.singlemr</name>
      <value>false</value>
</property>

4.9、大表join小表优化

首先介绍大表join小表优化。仍以销售明细事实表为例来说明大表join小表的场景。假如供应商会进行评级,比如(五星、四星、三星、两星、一星),此时业务人员希望能够分析各供应商星级的每天销售情况及其占比
开发人员一般会写出如下SQL:

select 
  b.seller_star, 
  count(a.order_id) as order_cnt 
from 
  (
    select 
      order_id, 
      seller_id 
    from 
      order_table 
    where 
      day = '2020-11-25'
  ) as a 
  left outer join (
    select 
      seller_id, 
      seller_start 
    from 
      dim_seller 
    where 
      day = '2020-11-25'
  ) as b on a.seller_id = b.seller_id 
group by 
  b.seller_star;

但正如上述所言,现实世界的二八准则将导致订单集中在部分供应商上,而好的供应商的评级通常会更高,此时更加剧了数据倾斜的程度,如果不加以优化,上述SQL将会消费很长时间,甚至运行不出结果。

通常来说,供应商是有限的,比如上千家、上万家数据量不会很大,而销售明细事实表表比较大,这就是典型的大表join小表文集,可以通过mapjoin的方式来优化,只需添加mapjoin hint即可,优化后的SQL如下:

select /*+mapjoin(b)*/
  b.seller_star, 
  count(a.order_id) as order_cnt 
from 
  (
    select 
      order_id, 
      seller_id 
    from 
      order_table 
    where 
      day = '2020-11-25'
  ) as a 
  left outer join (
    select 
      seller_id, 
      seller_start 
    from 
      dim_seller 
    where 
      day = '2020-11-25'
  ) as b on a.seller_id = b.seller_id 
group by 
  b.seller_star;

/+mapjoin(b)/ 即mapjoin hint,如果需要mapjoin多个表,则格式为/+mapjoin(b,cd)/.Hive对于mapjoin是默认开启的,设置参数为:

set hive.auto.convert.join = true;

mapjoin优化是在Map阶段进行join,而不是像通常那样在Reduce阶段按照join列进行分发后在每个Reduce任务节点上进行join,不需要分发也就没有倾斜的问题,相反Hive会将小表全量复制到每个Map任务节点上(对于本利是dim_seller表,当然仅全量复制b表SQL指定的列),然后每个Map任务节点执行lookup小表即可。

从上述分析可以看出,小表不能太大,否则全量复制分发得不偿失,实际上Hive根据参数:

hive.auto.convert.join.noconditionaltask.size

来确定小表的大小是否满足条件(默认25MB),实际中此参数值所允许的最大值可以修改,但是一般最大不能超过1GB(太大的话Map任务所在的节点内存会撑爆,Hive会报错。另外需要注意的是,HDFS显示的文件大小是压缩后的大小,当实际加载到内存的时候,容量会增大很多,很多场景下可能会膨胀10倍)。

4.10、大表join大表优化

如果上述mapjoin中小表dim_seller很大呢?比如超过了1GB的大小?这种就是大表join大表的问题。此类问题相对比较复杂,因此本段首先引入一个具体的问题场景,然后基于此介绍各种优化方案。

4.10.1、问题场景

问题场景如下:
A表为一个汇总表,汇总的卖家买家最近N天交易汇总信息。即对于每个卖家最近N天,其每个买家共成交了多少单、总金额是多少,为了专注于本段要解决的问题,N只取90天,汇总值仅取成交单数。A表的字段有:buyer_id,seller_id,和pay_cnt_90d。

B为卖家基本信息表,其中包含卖家的一个分层评级信息,比如卖家分为6个级别:S0、S1、S2、S3、S4、和S5。

要获得的结果是每个买家在各个级别卖家的成交比例信息,比如-->某买家:S0:10%,S1:20%,S2:30%,S3:20%,S4:10%,S5:20%

B表的字段有:seller_id、s_level

开发人员的第一反应,直接join两表并统计:

select 
  m.buyer_id,
  sum(b.pay_cnt_90d) as pay_cnt_90d,
  sum(case when m.s_level = 0 then pay_cnt_90d end) as pay_cnt_90d_s0,
  sum(case when m.s_level = 1 then pay_cnt_90d end) as pay_cnt_90d_s1,
  sum(case when m.s_level = 2 then pay_cnt_90d end) as pay_cnt_90d_s2,
  sum(case when m.s_level = 3 then pay_cnt_90d end) as pay_cnt_90d_s3,
  sum(case when m.s_level = 4 then pay_cnt_90d end) as pay_cnt_90d_s4,
  sum(case when m.s_level = 5 then pay_cnt_90d end) as pay_cnt_90d_s5
from 
  (
    select 
      a.buyer_id, 
      a.seller_id, 
      b.s_level, 
      a.pay_cnt_90d 
    from 
      (
        select 
          buyer_id, 
          seller_id, 
          pay_cnt_90d 
        from 
          table_A
      ) as a 
      join (
        select 
          seller_id, 
          s_level 
        from 
          table_b
      ) as b on a.seller_id = b.seller_id
  ) as m 
group by 
  m.buyer_id;

但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分卖家90天内的买家数目并不多,join table_A和table_B的时候ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。

但是本数据倾斜问题无法用mapjoin table_B解决,因为卖家有超过千万条、文件大小有几个GB,超过了mapjoin表最大1GB的限制。

4.10.2、方案1:join时用case when语句

此种解决方案应用场景为:倾斜的值是明确的而且数量很少,比如null值引起的倾斜。其核心是将这些倾斜的值随机分到Reduce,其主要核心逻辑在于join时对这些特殊的值concat随机数,从而达到随机分发的目的。此方案的核心逻辑如下:

select a.user_id,a.order_id,b.user_id
from table_a a
join table_b b
on (case when a.user_id is null then concat('hive',rand()) else a.user_id end) = b.user_id;

Hive已经对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不需要修改SQL代码,例如,由于table_b的值“0”和“1”引起了倾斜,只需做如下设置:

set hive.optmize.skewinfo = table_b:(seller_id)[("0")("1")];
set hive.optmize.skewjoin = true;

但是方案2也无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化

4.10.3、方案2:动态一分为二

终极解决方案就是动态一分为二,即对倾斜的键值和不倾斜的键值分卡处理,不倾斜的正常join即可,倾斜的把它们找出来然后做mapjoin,最后union all其结果即可。

但是此种解决方案比较麻烦,代码会变得复杂而且需要一个临时表存放倾斜的键值。

采用此解决方案的伪代码如下所示:

-- 由于数据倾斜,先找出近90天买家数超过10000的卖家
insert overwrite table tmp_table_b
select 
  m.seller_id, 
  n.s_level 
from 
  (
    select 
      seller_id 
    from 
      (
        select 
          seller_id, 
          count(buyer_id) as byr_cnt 
        from 
          table_A 
        group by 
          seller_id
      ) as 
    where 
      a.byr_cnt > 10000
  ) m
 left outer join(
 	select 
    	user_id,
        s_level
    from table_b
 ) n
 on m.seller_id = n.user_id;

对于90天买家数超过10000的卖家直接mapjoin,对于其他卖家正常join即可