Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 818|回复: 0

Spark With Mongodb 实现方法及error code -5, 6, 13127解决方案

[复制链接]
  • TA的每日心情
    奋斗
    5 天前
  • 签到天数: 803 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726482
    发表于 2021-4-25 04:34:36 | 显示全部楼层 |阅读模式

    1.spark mongo 读取

    val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(regex("path", java.util.regex.Pattern compile r.toString)))).build.toRDD()
    

      

    2.error code 6

    在spark读数据时容易遇到,mongos连接池已满,操作被拒绝,需要修改spark中的connectionperhost
    lazy val mongo = new MongoClient("192.168.12.161", MongoClientOptions.builder().connectionsPerHost(8).build())
    

      

     然后找管理员查看Mongos当前已连接数,在过多时需要进行重启mongos ./bin/mongostat --host 192.168.12.161
    PS: 修改MongoDB机器的打开文件数会明显改善此问题出现的频次,甚至不需要修改connectionsPerHost即可解决问题。修改/etc/security/limits.conf中的nofile即可,mongoDB3.4之后的版本连接数默认是65536,不用修改连接数限制。

    3.error code -5

    driver出现错误,任务终止
    Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 2639909050433532364 not found on server 192.168.12.161:27017' on server 192.168.12.161:27017
    at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
    at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:213)
    at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
    at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
    View Code
     cursor超时,按照官方说法为cursor10分钟未使用,查看spark日志发现是第433个分片出现错误
    17/07/17 19:37:31 ERROR Executor: Exception in task 433.0 in stage 0.0 (TID 433)
    com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 2639909048849185072 not found on server 192.168.12.161:27017' on server 192.168.12.161:27017
    View Code
     可以看到是19:37出现的错误,这个task启动时间为: 17/07/17 19:14:23 INFO CoarseGrainedExecutorBackend: Got assigned task 433 17/07/17 19:14:23 INFO Executor: Running task 433.0 in stage 0.0 (TID 433) 可以确定的是,确实超过10分钟了,申请完cursor之后并没有开始执行,而是等到10分钟之后才开始进行操作,目前未发现原因。 查看日志发现中间有一些文件访问被拒绝的错误,:ulimit -a 看只有1024, 于是修改/etc/security/limits.conf: * soft nofile 40960 * hard nofile 40960 修改之后不设置connectionsPerHost也不会出现访问被拒绝的错误或者error code 6,但仍旧会出现error code -5 在最新一次运行中,第452、1940、2005等分片出现错误,而且分片处于不同的executor上,也就是说此错误和计算节点无关。 
    在stackoverflow上发现java driver的解决方案,java里可以使用 db.find().nocursorTimeout()来解决,但需要记得关闭cursor,不然mongos会一直占用额外内存。 去github上查看mongo-spark-connector的源代码发现: MongoRDD的compute方法:
    override def compute(split: Partition, context: TaskContext): Iterator[D] = {
    val client = connector.value.acquireClient()
    val cursor = getCursor(client, split.asInstanceOf[MongoPartition])
    context.addTaskCompletionListener((ctx: TaskContext) => {
      log.debug("Task completed closing the MongoDB cursor")
      Try(cursor.close())
      connector.value.releaseClient(client)
    })
    cursor.asScala
    }

     

     getCursor的函数:
    private def getCursor(client: MongoClient, partition: MongoPartition)(implicit ct: ClassTag[D]): MongoCursor[D] = {
    val partitionPipeline: Seq[BsonDocument] = readConfig.partitioner match {
      case MongoSinglePartitioner => pipeline
      case _                      => new BsonDocument("$match", partition.queryBounds) +: pipeline
    }
    client.getDatabase(readConfig.databaseName)
      .getCollection[D](readConfig.collectionName, classTagToClassOf(ct))
      .withReadConcern(readConfig.readConcern)
      .withReadPreference(readConfig.readPreference)
      .aggregate(partitionPipeline.asJava)
      .allowDiskUse(true)
      .iterator
    }
     对于connector来讲,每个分片创建一个Mongoclient,获取database,添加聚合数据,由于我程序中执行完toRDD操作之后直接进行了foreach,按理说不会出现获取了cursor但是未使用的状况。考虑到mongos的执行过程:一次操作获取每个shard上的一个cursor,最后把数据汇总起来返回结果。
    开始怀疑是不是因为某一个节点上pipeline执行equal的操作过慢导致cursor被拒绝,后来发现即使不加pipeline也会出现问题。
    后来排查是不是Mongodb并发读数据有问题,后来发现执行MongoSpark.load.toRDD.count并没有出错,而且访问速度也较有处理过程的快得多,于是决定先进行cache,然后count获取全量数据cache在本地,再对此rdd进行操作。解决问题的原理就是通过一个简单的count程序将所需要的数据全部读到分片本地,使用cache方法缓存起来,这样后面处理此RDD时就用的本地缓存数据,而不会因为处理时间过长出现curser超时的问题。

    因此推荐解决方案如下:

    1)单机条件下Java driver 使用
    db.find().nocursorTimeout()来解决,但需要记得关闭cursor。
    2) Spark环境下在代码真正的处理逻辑之前加上如下两句:
           rdd.cache()
           println(rdd.count())

    先把读取数据cache一下,然后使用一个简单的Action操作把数据真正缓存起来

    另一种可以解决但是不用每次都修改Spark代码逻辑的方法是:
    先修改MongoDB的代码,把AggregateIterable加入noCursorTimeout方法,然后修改mongo-spark-connector,使用此方法。是不是很6?
    因为AggregateIterable虽然和FindIterable都是获取数据的方式,但是noCursorTimeout是FindIterable的特有方法,但是又不能把connector的Aggregate方法改成Find方法,因为Find不能加Pipeline,毕竟还得加查询条件不是~
    不要看没用的
     
    

    4. error code 13127

    Query failed with error code 13127 and error message 'cursor id 206776738953 didn't exist on server.' on server 192.168.12.161:27017 和-5错误原因是一样的,同样的解决方案。

    5.spark resource引用

    试了好几种方法,最稳的还是把resource拷贝到每台机器并指定绝对路径。。
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-11-22 05:08 , Processed in 0.066951 second(s), 28 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表