Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 885|回复: 0

解决spark中遇到的数据倾斜问题

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-6-25 14:44:11 | 显示全部楼层 |阅读模式

    一. 数据倾斜的现象

    多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。

    二. 数据倾斜的原因

    常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作。

    数据问题

    1. key本身分布不均匀(包括大量的key为空)
    2. key的设置不合理

    spark使用问题

    1. shuffle时的并发度不够
    2. 计算方式有误

    三. 数据倾斜的后果

    1. spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。
    2. 过多的数据在同一个task中执行,将会把executor撑爆,造成OOM,程序终止运行。

    一个理想的分布式程序: 
    理想的分布式程序

    发生数据倾斜时,任务的执行速度由最大的那个任务决定: 
    发生数据倾斜

    四. 数据问题造成的数据倾斜

    发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。

    找出异常的key

    如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。

    选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个

    df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)

    如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。

    经过分析,倾斜的数据主要有以下三种情况:

    1. null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
    2. 无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
    3. 有效数据,业务导致的正常数据分布。

    解决办法

    第1,2种情况,直接对数据进行过滤即可。

    第3种情况则需要进行一些特殊操作,常见的有以下几种做法。

    1. 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
    2. 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
    3. 使用reduceByKey 代替 groupByKey
    4. 使用map join。

    举例:

    如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作如下:

    1. 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
    2. 对数据进行 reduceByKey(func)
    3. 将 key + 随机值 转成 key
    4. 再对数据进行 reduceByKey(func)

    tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。

    tips2: 单独处理异常数据时,可以配合使用Map Join解决。

    五. spark使用不当造成的数据倾斜

    1. 提高shuffle并行度

    dataFramesparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。 
    rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。

    局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。

    2. 使用map join 代替reduce join

    在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了。

    局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。

    具体使用方法和处理流程参照:

    Spark map-side-join 关联优化

    spark join broadcast优化

    六. MapReduce过程中数据倾斜的处理

    1. 过滤无效数据,如空值、测试数据等等
    2. 在map端使用combiner函数
    3. 局部聚合加全局聚合。
      1. 先对key加随机后缀,然后进行reduce操作
      2. 对第一次执行的结果再此进行MR操作。(在map端去掉后缀后再进行reduce操作)

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-1 19:06 , Processed in 0.071043 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表