Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 752|回复: 0

spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-5-19 11:50:07 | 显示全部楼层 |阅读模式

    1、背景:

        控制上游文件个数每天7000个,每个文件大小小于256M,50亿条+,orc格式。查看每个文件的stripe个数,500个左右,查询命令:hdfs fsck viewfs://hadoop/nn01/warehouse/…….db/……/partition_date=2017-11-11/part-06999 -files -blocks;

    stripe个数查看命令:hive --orcfiledump viewfs://hadoop/nn01/warehouse/…….db/table/partition_date=2017-11-11/part-06999 | less

    2、问题出现:

        通过Spark SQL读取orc格式文件,从spark作业提交到计算出Partition,开始执行Task,间隔时间太长。

        频繁打印如下日志:
    17/11/11 03:52:01 INFO BlockManagerMasterEndpoint: Registering block manager gh-data-hdp-dn0640.---:11942 with 6.1 GB RAM, BlockManagerId(554, ----, 11942)
    17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: DatanodeInfoWithStorage[10.20.--.--:50010,DS-32f8aaa5-c6ce-48a9-a2b1-3b169df193b9,DISK], --

    17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: 

        问题抽象:如果执行如下简单SQL 也会出现作业提交后ApplicationMaster(Driver)启动了,作业Task迟迟不执行,Partition不能计算出来。SparkUI刷不出来DAU图,看不到Stage相关信息。

    SELECT * from table where partition_date=2017-11-11 limit 1;

    3、问题分析

        初步分析:Driver读取DataNode的数据,通过分析GC日志发现:确认Driver读取了DataNode上的数据(orc文件的head信息),导致Driver产生了full GC。

        源码跟踪分析:发现和spark读取orc文件的策略有关系。

        查看HiveConf.java发现Spark读取orc文件默认采用HYBRID策略。

     

    HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet(new String[]{"HYBRID", "BI", "ETL"}),
     "This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed 
    to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in 
    split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based 
    on heuristics."),

     

        查看OrcInputFormat.java文件发现HYBRID切分策略代码如下:

     

      public SplitStrategy call() throws IOException {
        final SplitStrategy splitStrategy;
        AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
            context.conf, context.transactionList);
        List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
        Path base = dirInfo.getBaseDirectory();
        List<FileStatus> original = dirInfo.getOriginalFiles();
        boolean[] covered = new boolean[context.numBuckets];
        boolean isOriginal = base == null;
        // if we have a base to work from
        if (base != null || !original.isEmpty()) {
          // find the base files (original or new style)
          List<FileStatus> children = original;
          if (base != null) {
            children = SHIMS.listLocatedStatus(fs, base,
                AcidUtils.hiddenFileFilter);
          }
          long totalFileSize = 0;
          for (FileStatus child : children) {
            totalFileSize += child.getLen();
            AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
                (child.getPath(), context.conf);
            int b = opts.getBucket();
            // If the bucket is in the valid range, mark it as covered.
            // I wish Hive actually enforced bucketing all of the time.
            if (b >= 0 && b < covered.length) {
              covered = true;
            }
          }
          int numFiles = children.size();
          long avgFileSize = totalFileSize / numFiles;
          switch(context.splitStrategyKind) {
            case BI:
              // BI strategy requested through config
              splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
                  deltas, covered);
              break;
            case ETL:
              // ETL strategy requested through config
              splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
                  deltas, covered);
              break;
            default:
              // HYBRID strategy
              if (avgFileSize > context.maxSize) {
                splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
                    covered);
              } else {
                splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
                    covered);
              }
              break;
          }
        } else {
          // no base, only deltas
          splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
        }
        return splitStrategy;
      }
    }

     

        HYBRID策略:Spark Driver启动的时候,会去nameNode读取元数据,根据文件总大小和文件个数计算一个文件的平均大小,如果这个平均值大于默认256M的时候就会触发ETL策略。ETL策略就会去DataNode上读取orc文件的head等信息,如果stripe个数多或元数据信息太大就会导致Driver 产生FUll GC,这个时候就会表现为Driver启动到Task执行间隔时间太久的现象。

    4、解决方案:

    spark 1.6.2:

     

    val hiveContext = new HiveContext(sc)
    // 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
    // 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
    hiveContext.setConf("hive.exec.orc.default.stripe.size","268435456")
    // 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
    // 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
    hiveContext.setConf("hive.exec.orc.split.strategy", "BI")

     

    spark2.2.0:

     

    // 创建一个支持Hive的SparkSession
    val sparkSession = SparkSession
      .builder()
      .appName("PvMvToBase")
      // 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
      // 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
      .config("hive.exec.orc.default.stripe.size", 268435456L)
      // 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
      // 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
      .config("hive.exec.orc.split.strategy", "BI")
      .enableHiveSupport()
      .getOrCreate()

    Spark Shuffle六大问题 fetch操作、数据存储、文件个数、什么排序算法简单介绍
    MapReduce过程详解及其性能优化

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-6 15:59 , Processed in 0.075073 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表