Spark 数据倾斜及其处理方案
发布时间:2022-03-01 14:39 所属栏目:125 来源:互联网
导读:本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。 一、什么是数据倾斜 对 Spark/Hadoop 这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。 对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增
本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。 一、什么是数据倾斜 对 Spark/Hadoop 这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。 对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗时线性下降。如果一台机器处理一批大量数据需要120分钟,当机器数量增加到3台时,理想的耗时为120 / 3 = 40分钟。但是,想做到分布式情况下每台机器执行时间是单机时的1 / N,就必须保证每台机器的任务量相等。不幸的是,很多时候,任务的分配是不均匀的,甚至不均匀到大部分任务被分配到个别机器上,其它大部分机器所分配的任务量只占总得的小部分。比如一台机器负责处理 80% 的任务,另外两台机器各处理 10% 的任务。 『不患多而患不均』,这是分布式环境下最大的问题。意味着计算能力不是线性扩展的,而是存在短板效应: 一个 Stage 所耗费的时间,是由最慢的那个 Task 决定。 由于同一个 Stage 内的所有 task 执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同 task 之间耗时的差异主要由该 task 所处理的数据量决定。所以,要想发挥分布式系统并行计算的优势,就必须解决数据倾斜问题。 二、数据倾斜的危害 当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。 另外,当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。 三、数据倾斜的现象 当发现如下现象时,十有八九是发生数据倾斜了: 绝大多数 task 执行得都非常快,但个别 task 执行极慢,整体任务卡在某个阶段不能结束。 原本能够正常执行的 Spark 作业,某天突然报出 OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。 TIPS 在 Spark streaming 程序中,数据倾斜更容易出现,特别是在程序中包含一些类似 sql 的 join、group 这种操作的时候。因为 Spark Streaming 程序在运行的时候,我们一般不会分配特别多的内存,因此一旦在这个过程中出现一些数据倾斜,就十分容易造成 OOM。 四、如何缓解数据倾斜 基本思路 业务逻辑: 我们从业务逻辑的层面上来优化数据倾斜,比如要统计不同城市的订单情况,那么我们单独对这一线城市来做 count,最后和其它城市做整合。 程序实现: 比如说在 Hive 中,经常遇到 count(distinct)操作,这样会导致最终只有一个 reduce,我们可以先 group 再在外面包一层 count,就可以了;在 Spark 中使用 reduceByKey 替代 groupByKey 等。 参数调优: Hadoop 和 Spark 都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。 思路1. 过滤异常数据 如果导致数据倾斜的 key 是异常数据,那么简单的过滤掉就可以了。 首先要对 key 进行分析,判断是哪些 key 造成数据倾斜。具体方法上面已经介绍过了,这里不赘述。 然后对这些 key 对应的记录进行分析: 空值或者异常值之类的,大多是这个原因引起 无效数据,大量重复的测试数据或是对结果影响不大的有效数据 有效数据,业务导致的正常数据分布 解决方案 对于第 1,2 种情况,直接对数据进行过滤即可。 第3种情况则需要特殊的处理,具体我们下面详细介绍。 思路2. 提高 shuffle 并行度 Spark 在做 Shuffle 时,默认使用 HashPartitioner(非 Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。 如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。 (1)操作流程 RDD 操作 可在需要 Shuffle 的操作算子上直接设置并行度或者使用 spark.default.parallelism 设置。如果是 Spark SQL,还可通过 SET spark.sql.shuffle.partitions=[num_tasks] 设置并行度。默认参数由不同的 Cluster Manager 控制。 dataFrame 和 sparkSql 可以设置 spark.sql.shuffle.partitions=[num_tasks] 参数控制 shuffle 的并发度,默认为200。 (2)适用场景 大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。 (3)解决方案 调整并行度。一般是增大并行度,但有时如减小并行度也可达到效果。 (4)优势 实现简单,只需要参数调优。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。 (5)劣势 适用场景少,只是让每个 task 执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些 key 的大小非常大,即使一个 task 单独执行它,也会受到数据倾斜的困扰。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。 TIPS 可以把数据倾斜类比为 hash 冲突。提高并行度就类似于 提高 hash 表的大小。 思路3. 自定义 Partitioner (1)原理 使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task。 例如,我们在 groupByKey 算子上,使用自定义的 Partitioner: 复制 .groupByKey(new Partitioner() { @Override public int numPartitions() { return 12; } @Override public int getPartition(Object key) { int id = Integer.parseInt(key.toString()); if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) { return (id - 9500000) / 12; } else { return id % 12; } } }) TIPS 这个做法相当于自定义 hash 表的 哈希函数。 (2)适用场景 大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。 (3)解决方案 使用自定义的 Partitioner 实现类代替默认的 HashPartitioner,尽量将所有不同的 Key 均匀分配到不同的 Task 中。 (4)优势 不影响原有的并行度设计。如果改变并行度,后续 Stage 的并行度也会默认改变,可能会影响后续 Stage。 (5)劣势 适用场景有限,只能将不同 Key 分散开,对于同一 Key 对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的 Partitioner,不够灵活。 思路4. Reduce 端 Join 转化为 Map 端 Join 通过 Spark 的 Broadcast 机制,将 Reduce 端 Join 转化为 Map 端 Join,这意味着 Spark 现在不需要跨节点做 shuffle 而是直接通过本地文件进行 join,从而完全消除 Shuffle 带来的数据倾斜。 复制 from pyspark.sql.functions import broadcast result = broadcast(A).join(B, ["join_col"], "left") 1. 2. 其中 A 是比较小的 dataframe 并且能够整个存放在 executor 内存中。 (1)适用场景 参与Join的一边数据集足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中。 (2)解决方案 在 Java/Scala 代码中将小数据集数据拉取到 Driver,然后通过 Broadcast 方案将小数据集的数据广播到各 Executor。或者在使用 SQL 前,将 Broadcast 的阈值调整得足够大,从而使 Broadcast 生效。进而将 Reduce Join 替换为 Map Join。 (3)优势 避免了 Shuffle,彻底消除了数据倾斜产生的条件,可极大提升性能。 (4)劣势 因为是先将小数据通过 Broadcase 发送到每个 executor 上,所以需要参与 Join 的一方数据集足够小,并且主要适用于 Join 的场景,不适合聚合的场景,适用条件有限。 NOTES 使用Spark SQL时需要通过 SET spark.sql.autoBroadcastJoinThreshold=104857600 将 Broadcast 的阈值设置得足够大,才会生效。 rightRDD 与倾斜 Key 对应的部分数据,需要与随机前缀集 (1~n) 作笛卡尔乘积 (即将数据量扩大 n 倍),从而保证无论数据倾斜侧倾斜 Key 如何加前缀,都能与之正常 Join。skewRDD 的 join 并行度可以设置为 n * k (k 为 topSkewkey 的个数)。由于倾斜Key与非倾斜Key的操作完全独立,可并行进行。 (1)适用场景 两张表都比较大,无法使用 Map 端 Join。其中一个 RDD 有少数几个 Key 的数据量过大,另外一个 RDD 的 Key 分布较为均匀。 (2)解决方案 将有数据倾斜的 RDD 中倾斜 Key 对应的数据集单独抽取出来加上随机前缀,另外一个 RDD 每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。 其实就是上一个方法的特例或者简化。少了拆分,也就没有 union。 (1)适用场景 一个数据集存在的倾斜 Key 比较多,另外一个数据集数据分布比较均匀。 (2)优势 对大部分场景都适用,效果不错。 (3)劣势 需要将一个数据集整体扩大 N 倍,会增加资源消耗。 思路7. map 端先局部聚合 在 map 端加个 combiner 函数进行局部聚合。加上 combiner 相当于提前进行 reduce ,就会把一个 mapper 中的相同 key 进行聚合,减少 shuffle 过程中数据量 以及 reduce 端的计算量。这种方法可以有效的缓解数据倾斜问题,但是如果导致数据倾斜的 key 大量分布在不同的 mapper 的时候,这种方法就不是很有效了。 复制 def antiSkew(): RDD[(String, Int)] = { val SPLIT = "-" val prefix = new Random().nextInt(10) pairs.map(t => ( prefix + SPLIT + t._1, 1)) .reduceByKey((v1, v2) => v1 + v2) .map(t => (t._1.split(SPLIT)(1), t2._2)) .reduceByKey((v1, v2) => v1 + v2) 不过进行两次 mapreduce,性能稍微比一次的差些。 (编辑:ASP站长网) |
相关内容
网友评论
推荐文章
热点阅读