Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 784|回复: 0

解决spark streaming集成kafka时只能读topic的其中一个分区数据的问题

[复制链接]
  • TA的每日心情
    奋斗
    5 小时前
  • 签到天数: 804 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726782
    发表于 2021-5-23 21:34:56 | 显示全部楼层 |阅读模式

    1. 问题描述

    我创建了一个名称为myTest的topic,该topic有三个分区,在我的应用中spark streaming以direct方式连接kakfa,但是发现只能消费一个分区的数据,多次更换comsumer group依然如此。

    2 环境配置

    kafka集群环境,

    主机 IP 操作系统 kakfa
    node1 192.168.1.101 Centos 6.5 kafka_2.11-0.10.1.1
    node2 192.168.1.102 Centos 6.5 kafka_2.11-0.10.1.1
    node3 192.168.1.103 Centos 6.5 kafka_2.11-0.10.1.1

    应用依赖:spark版本是2.1.1、kakfa版本是0.10.1.1;
    maven依赖配置如下

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>$2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.1.1</version>
    </dependency>
    

    相关配置代码(Java)如下:

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    Set<String> topics = new HashSet<String>(Arrays.asList("testTopic"));
    JavaInputDStream<ConsumerRecord<Object, Object>> dStream = KafkaUtils.createDirectStream(
    	jssc,
    	LocationStrategies.PreferConsistent(),
    	ConsumerStrategies.Subscribe(topics, kafkaParams));
    

    3. 解决方案

    经过查阅相关资料发现是由于Kafka 0.10.1.1的bug导致的。其实不仅仅是0.10.1.1,另外0.10.1.0和0.10.0.2也有这个问题。详细描述参考https://issues.apache.org/jira/browse/KAFKA-4547
    最后我将kafka版本降到了0.10.0.1,解决了这个问题。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-11-24 21:16 , Processed in 1.006955 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表