Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 1222|回复: 0

sparksql系列(五) SparkSql异常处理,优化,及查看执行计划

[复制链接]
  • TA的每日心情
    奋斗
    6 天前
  • 签到天数: 803 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726482
    发表于 2021-7-1 16:53:42 | 显示全部楼层 |阅读模式
    有了上面四篇文章,再加上一些异常处理、优化,开发基本就没什么问题了。下面我们开始:

    一:SparkSql异常处理

    将类转换为DF

    实际开发过程中有很多需要将一个数字或者汇聚出来的数据转换为DF的需求

    这时候可以将数字或者数据转换成一个类,将类转换为DF

    val data = scala.collection.mutable.MutableList[Data]()
    data.+=(Data("a","b"))
    import sparkSession.implicits._
    data.toDF().show(100)

    读JSON文件异常处理

        val sparkSession= SparkSession.builder().master("local").getOrCreate()

        var df2 = sparkSession.emptyDataFrame
        try {
          df2 = sparkSession.read.json("/JAVA/data/")
        } catch {
          case e: Exception => {
            println("error info")
          }
        }
        df2.show(100)

    读CSV文件异常处理

        val sparkSession= SparkSession.builder().master("local").getOrCreate()

        var df2 = sparkSession.emptyDataFrame
        try {
          df2 = sparkSession.read.option("sep", "|").csv("/JAVA/data/")
            .toDF("name","sex")
        } catch {
          case e: Exception => {
            println("error info")
          }
        }
        df2.show(100)

    读TEXT文件异常处理。

        个人理解CSV和TEXT一样,直接csv即可。还有一个原因是TEXT需要手动的去切分字符串作为一个列,使用起来太不方便了。还不如直接使用CSV

    写文件异常

        val sparkSession= SparkSession.builder().master("local").getOrCreate()
        var df = sparkSession.emptyDataFrame
        df = sparkSession.read.option("sep", "|").csv("/JAVA/data")
    .      toDF("name","sex")
        df.write.mode(SaveMode.Overwrite).option("sep", "|").csv("/JAVA/data1")

     

        SaveMode.Overwrite:覆盖式写文件,没有文件夹会创建文件夹

        SaveMode.Append:添加式写文件,没有文件夹会报错,建议使用SaveMode.Overwrite

    数据异常填充

        进行真正开发的时候,经常join导致有一些空值(NULL),有时候产品需要将空值转换为一些特殊处理值:

        val sparkSession= SparkSession.builder().master("local").getOrCreate()
        val javasc = new JavaSparkContext(sparkSession.sparkContext)

        val nameRDD = javasc.parallelize(Arrays.asList(
          "{'name':'','age':''}",
          "{'name':'sunliu','age':'19','vip':'true'}"));
        val namedf = sparkSession.read.json(nameRDD)

        namedf.na.fill(Map("name"->"zhangsan","age"->"18","vip"->"false")).show(100)//第一个数据不是空值,是空字符串

    age name vip
        false
    19 wangwu true
    19 wangwu true

    二:SparkSql优化

    缓存

                         Spark中当一个Rdd多次使用的时候就需要进行缓存。缓存将大大的提高代码运行效率。

          val sparkSession= SparkSession.builder().master("local").getOrCreate()
          val javasc = new JavaSparkContext(sparkSession.sparkContext)

          val nameRDD = javasc.parallelize(Arrays.asList(
            "{'name':'','age':''}",
            "{'name':'sunliu','age':'19','vip':'true'}"));
          val namedf = sparkSession.read.json(nameRDD)
          namedf.persist(StorageLevel.MEMORY_AND_DISK_SER)     

          个人建议使用MEMORY_AND_DISK_SER,因为内存还是比较珍贵的,磁盘虽然慢但是大。

          尽量不要使用MEMORY_AND_DISK_SER_2,这种后面有一个_2的,因为这是备份两个,一般情况下是不需要备份两个的。备份多了浪费内存。

    Join策略

        Spark有三种join的策略:broadcast join、Shuffle Hash Join、BroadcastHashJoin

        broadcastHash join(大表和极小表):

          当大表join小表的时候:将小表进行广播到各个节点。

          优点:不用进行数据shuffle,每个节点进行自己节点上数据的计算

          缺点:将一个表的数据全部加载到主节点,对主节点的压力较大。

          参数:广播的默认大小是10M可以适当将大小调整。 sparkSession.sql("set spark.sql.autoBroadcastJoinThreshold=134217728")

        Shuffle Hash Join(大表和小表)

          两个表进行重新分区之后,进行两个分区的数据遍历。

          优点:分区之后数据更小了,就全部加载到内存遍历就行了

          缺点:相对于broadcastHash join来说还是有一次shuffle

        SortMergeJoin(大表和小表)

          两个表进行重新分区之后,进行两个分区的数据遍历,个人感觉分区前和Shuffle Hash Join没什么区别。

          缺点:分区之后数据还不能全部加载到内存,需要进行排序。将相同key的加载到内存。

    执行计划

        val sparkSession= SparkSession.builder().master("local").getOrCreate()
        val javasc = new JavaSparkContext(sparkSession.sparkContext)

        val nameRDD = javasc.parallelize(Arrays.asList("{'name':'wangwu','age':'18','vip':'t'}"));
        val namedf = sparkSession.read.json(nameRDD)

        namedf.explain()//显示执行计划

    上线提交命令示例

        spark-submit
        --class class
        --master yarn                                      
        --executor-memory 6g                     //最大值取决于yarn.scheduler.maximum-allocation-mb
        --driver-memory 4g                           //driver内存
        --num-executors 4                            //executors个数
        --executor-cores 6                            //执行的核数
        --deploy-mode cluster                      //必须配置,默认是单节点模式
        --conf spark.driver.maxResultSize=6g
        Jar.jar

                   //executor-memory 和executor-cores的比例,应该和集群内存核数比例相同.例如集群1000G内存200核.那executor-memory除executor-cores应该是5

     

     

     

    Apache中文文档

                    http://spark.apachecn.org/#/docs/7?id=spark-sql-dataframes-and-datasets-guide

     
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-11-23 16:22 , Processed in 1.122388 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表