Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 853|回复: 0

Spark常见编程问题解决办法及优化

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-6-20 14:38:26 | 显示全部楼层 |阅读模式

    1.数据倾斜

    来源:读取数据之后,包括从数据源读取和shuffle后读取

    后果:大部分task和小部分task完成时间相差很大、OOM(也有可能时异常数据的问题,需要完善代码)。

    分析:用sample + countBykey -> 除以count判断key的分布情况。

    解决方法:

    1. 采用map-side聚合的算子
    2. 提高并行度repartition
    3. 先估计分布,确定哪些key导致倾斜,如果单个key数据不是太大,可以自定义partition为其分区;如果单个key数据很大,就多key进行改造。
    4. join类倾斜:
      • 过滤掉业务无关null再join
      • 其中一个表小时,广播join
      • 倾斜数据分离:分离出倾斜部分的表,这个表通常不大,此时再广播join
      • 如果单个key过大,那只能对该key进行改造了,即为key添加一个随机后序,如0、1、2中的一个,而另一个表则要扩大3倍,每条数据的key分别加上0、1、2的后缀(为保证所有key都被分配0后缀,从而另一个表没有足够的数据join)。这里可以自定义一些UDF来实现对数据分布的估计和改造key中n,即打散程度,的选择。
    5. 数据源:尽量用可分割文件保存数据、repartition

    2.TopN

    TopN问题可分为4种

    • 已有可比较数据的总体TopN。例如从一个班的语文成绩表中找前5名的学生,只是这个表的数据在不同的节点上。
      • DF转pairRDD后用takeOrdered()。优点:不需要全排;缺点:结果为聚合到Driver的Array,所以不适合N较大的情况。
      • DF -> sort -> rdd -> zipWithIndex -> filter(index < n) 。优点:适合N较大的情况,结果仍然是分布式的;缺点:全排,N较小时比上面慢
      • DF的sort后take。优点:简单;缺点:全排,N较大时很慢,甚至会OOM(take会将结果都shuffle到一个partition中)
    • 已有可比较数据分组TopN。例如从两个班的语文成绩表中找各班前5名的学生。
      • Aggragator。优点:快;缺点:较复杂
      • window function。优点:简单,适合N较大或者数据量较小;缺点:数据量大时稍慢(window function并不进行map-side聚合,所以shuffle量较大)
    • 未有可比较数据,需要分组聚合后才能比较的总体TopN。例如一个班每名学生各科成绩都在一个表上,求总分前5名的学生。
      • DF.groupBy().agg()然后接上面的“总体TopN”
    • 未有可比较数据,需要分组聚合后才能比较的分组TopN。例如两个班每名学生各科成绩都在一个表上,求各班总分前5名的学生。
      • DF.groupBy().agg()然后使用windowFunction。因为经过前面的groupBy的shuffle后,数据已经有了partitioner。所以此处的windowfunc操作并不会shuffle
    // Aggragator例子
    class TopNAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag](num: Int, ord: Ordering[(K2, V)])
      extends Aggregator[(K1, K2, V), mutable.PriorityQueue[(K2, V)], Array[(K2, V)]] {
    
      override def zero: mutable.PriorityQueue[(K2, V)] = new mutable.PriorityQueue[(K2, V)]()(ord)
    
      override def reduce(q: mutable.PriorityQueue[(K2, V)],
                           a: (K1, K2, V)): mutable.PriorityQueue[(K2, V)] = {
        if (q.size < num) {
          q += ((a._2, a._3))
        } else {
          q += ord.min((a._2, a._3), q.dequeue)
        }
      }
    
      override def merge(q1: mutable.PriorityQueue[(K2, V)],
                          q2: mutable.PriorityQueue[(K2, V)]): mutable.PriorityQueue[(K2, V)] = {
        q1 ++= q2
        while (q1.length > num) {
          q1.dequeue()
        }
        q1
      }
    
      override def finish(r: mutable.PriorityQueue[(K2, V)]): Array[(K2, V)] = {
        r.toArray.sorted(ord.reverse)
      }
    
      override def bufferEncoder: Encoder[mutable.PriorityQueue[(K2, V)]] = {
        Encoders.kryo[mutable.PriorityQueue[(K2, V)]]
      }
    
      override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]()
    }
    
    // 使用
    val topNAggregator = new TopNAggregator[Int, Int, Float](10, Ordering.by(-_._2))
    df.groupByKey()
      .agg(topNAggregator.toColumn)
    

    3.Join优化

    预排序的join

    针对SortMergeJoinExec,在mapper端提前sort。原代码在Reducer端进行排序,但reducer端的数据不及mapper端均匀,所以排序工作量不一,会导致尾部延迟放大。Map阶段会按照key的哈希值对数据进行重分区并按key排序。Reducer只需对来自不同Mapper的数据进行归并排序。这种机制相当于把Reducer排序的任务分流给Mapper。而由于Mapper的数据量往往是比较均匀的,所以排序的性能会优于Reducer。

    待考证:如果直接处理RDD,对两个需要join的RDD调用 repartitionAndSortWithinPartitions 然后join

    cross join

    当每条数据都需要和其余的每条数据进行计算时,例如计算相似度矩阵,下面的方法进行crossjoin能够大大减小其中间结果。实验时直接crossjoin能产生3G以上的数据,应用此方法则只有几十M。

    val ready2Crossjoin = movieFeatures.as[(Int, Array[Float])]
      .mapPartitions(_.grouped(4096))
    
        implicit val ordering = new Ordering[(Int, Float)] {
          def compare(x: (Int, Float), y: (Int, Float)): Int = {
            val compare2 = x._2.compareTo(y._2)
            if (compare2 != 0) return -compare2
            0
          }
        }
    
    val ratings = ready2Crossjoin.crossJoin(ready2Crossjoin)
      .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
      .flatMap {
        case (mf1Iter, mf2Iter) =>
          val m1 = mf1Iter.size
          val m2 = math.min(mf2Iter.size, 100)
          var i = 0
          val output = new Array[(Int, Int, Float)](m1 * m2)
          val pq = mutable.PriorityQueue[(Int, Float)]()
          val vectorOp = new F2jBLAS
          mf1Iter.foreach { case (m1Id, mf1Factor) =>
            mf2Iter.foreach { case (m2Id, mf2Factor) =>
              if (m1Id == m2Id) {
                // do nothing
              } else {
                val simScore = consinSim(ALSRank, vectorOp, mf1Factor, mf2Factor)
                if (pq.length < m2) {
                  pq.enqueue((m2Id, simScore))
                } else {
                  val temp = pq.dequeue()
                  pq += (if (temp._2 > simScore) temp else (m2Id, simScore))
                }
              }
            }
            pq.foreach { case (mf2Id, score) =>
              output(i) = (m1Id, mf2Id, score)
              i += 1
            }
            pq.clear()
          }
          output.toSeq
      }
    
    private def consinSim(rank: Int, operator: F2jBLAS, movie1: Array[Float], movie2: Array[Float]): Float = {
        operator.sdot(rank, movie1,1, movie2, 1) / operator.snrm2(rank, movie1,1) * operator.snrm2(rank, movie2,1)
    }
    

    考虑Join顺序

    Spark SQL的CBO尚未成熟,不能对SQL中的join的顺序做智能调整。顺序的确定需要对数据表的分布有所了解,从而推断某些顺序能够产生更少的中间数据,进而提高效率。

    4.根据HashMap、DF等数据集进行filter

    在HashMap、DF等数据集较小的情况下:

    • HashMap:广播map,然后根据contain来filter。适合数据集较小的情况。

    • DF:提取相应的列后,然后用left_anti。适合比上面数据集稍大的情况。

    当数据集很大时,同样利用上面DF的方法,但去掉broadcast,然Spark自行决定如何join。

    // HashMap filter
    val BCMap = sc.broadcast(mapForFilter)
    val filteredDF = df.filter($"col_name" isin (BCMap.value: _*))
    
    // DF filter
    val DFForFilter = df1.select("id")
    val filteredDF = df0.join(broadcast(filteredDF), Seq("id"), "left_anti"))
    

    5.Join去掉重复的列

    val df = left.join(right, Seq("name"))
    

    6.展开NestedDF

    +---+-----------+
    | _1|         _2|
    +---+-----------+
    |  1|[2, [3, 4]]|
    +---+-----------+
    
    +---+-----+--------+--------+
    | _1|_2._1|_2._2._1|_2._2._2|
    +---+-----+--------+--------+
    |  1|    2|       3|       4|
    +---+-----+--------+--------+
    
    implicit class DataFrameFlattener(df: DataFrame) {
      def flattenSchema: DataFrame = {
        df.select(flatten(Nil, df.schema): _*)
      }
    
      protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
        case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
        case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
      }
    }
    
    veryNestedDF.flattenSchema.show()
    

    7.计算session/组内时间差

    val timeFmt = "yyyy-MM-dd HH:mm:ss"
    val sessionid2ActionsRDD2 = UserVisitActionDF
      .withColumn("action_time", unix_timestamp($"action_time", timeFmt))
      .groupBy("session_id")
      .agg(min("action_time") as "start",
        max("action_time") as "end",
      .withColumn("visitLength", $"start" - $"end")
    

    8.用flatMap替代map + filter

    df.flatMap(if (filter_condition) Some(result) else None)
    

    9.分层抽样

    // 各种类型抽取10%
    val fractions = HashMap(
      TYPE1 -> 0.1,
      TYPE2 -> 0.1,
      TYPE3 -> 0.1
    )
    val randomSeed = 2L
    df.stat.sampleBy("col_name", fractions, randomSeed)
    
    // 如果col_name的数据种类未知,用下面方式得出fractions
    df.select("time_period")
      .distinct
      .map(x=> (x, 0.1))
      .collectAsMap
    

    10.SQL与DF API

    SQL作为声明式语言,即只需要指定所需数据的模式就能得到结果。这种语言的编程思路容易让人忽略代码的执行顺序,从而写出一些执行效率低的代码。尽管Spark有Optimizer优化,但尚未完全成熟,部分SQL语句无法实现filter、aggregation等下推。

    DF API是一种函数式的语言,能让编程者注意到执行顺序,减小写出低效代码的可能。

    11.Shuffle后的分区

    使用DF时,开启自动分区。

    如果适用RDD,则有些shuffle是可以输入partitioner参数的,这就可以控制shuffle后的分区数,一些情况还能避免shuffle。如下面代码,rdd2执行reduceByKey的shuffle时使用rdd1的partitioner,那么之后的rdd3和rdd1的join就不需要shuffle了。

    val rdd1Partitioner = rdd1.partitioner match {
      case Some(p) => p
      case None => new HashPartitioner(rdd1.partitions.length)
    }
    val rdd3 = rdd2.reduceByKey(rdd1Partitioner, (x, y) => if (x > y) x else y)
    rdd3.join(rdd1)
    

    12.多维分析的优化

    多维分析,如rollup、cube等的算子,在Spark内置的是Expand方式,根据选用的算子一次性开辟足够的内存。如果实现Union方式的二次开发,即读取一次计算一个维度的结果,然后不断union这些结果,能在某些情况提升效率。

    总体来说,Expand方式适合维度小的多维分析,Union方式适合维度大的多维分析。这是因为Expand方式读取数据的次数只有一次,但数据会膨胀2n倍,而Union方式会读取数据2n次。

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-20 21:03 , Processed in 0.073075 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表