Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 981|回复: 0

Redis的Pub/Sub机制存在的问题以及解决方案

[复制链接]
  • TA的每日心情
    奋斗
    2024-11-24 15:47
  • 签到天数: 804 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726782
    发表于 2021-8-29 13:14:48 | 显示全部楼层 |阅读模式

      Redis的Pub/Sub机制使用非常简单的方式实现了观察者模式,但是在使用过程中我们发现,它仅仅是实现了发布订阅机制,但是很多的场景没有考虑到。例如一下的几种场景:

      1.数据可靠性无法保证

      一个redis_cli发送消息的时候,消息是无状态的,也就是说负责发送消息的redis_cli只管发送消息,并不会理会消息是否被订阅者接收到,也不会理会是否在传输过程中丢失,即对于发布者来说,消息是”即发即失”的。

      2.扩展性差

      不能通过增加消费者来加快消耗发布者的写入的数据,如果发布者发布的消息很多,则数据阻塞在通道中已等待被消费着来消耗。阻塞时间越久,数据丢失的风险越大(网络或者服务器的一个不稳定就会导致数据的丢失)

      3.资源消耗高

      在pub/sub中消息发布者不需要独占一个Redis的链接,而消费者则需要单独占用一个Redis的链接,在java中便不得独立出分出一个线程来处理消费者。这种场景一般对应这多个消费者,此时则有着过高的资源消耗。

      对于如上的几种不足,如果在项目中需要考虑的话可以使用JMS来实现该功能。JMS提供了消息的持久化/耐久性等各种企业级的特性。如果依然想使用Redis来实现并做一些数据的持久化操作,则可以根据JMS的特性来通过Redis模拟出来.

      模拟的步骤如下:
      1.subscribe端首先向一个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯一的订阅者,例如:sub:email,sub:web。此SET称为“活跃订阅者集合”

      2.subcribe端开启订阅操作,并基于Redis创建一个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息。此LIST称为“订阅者消息队列”

      3.publish端:每发布一条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发布的消息。到此为止,我们可以基本保证,发布的每一条消息,都会持久保存在每个“订阅者消息队列”中。

      4.subscribe端,每收到一个订阅消息,在消费之后,必须删除自己的“订阅者消息队列”头部的一条记录。subscribe端启动时,如果发现自己的自己的“订阅者消息队列”有残存记录,那么将会首先消费这些记录,然后再去订阅。

      实现如下:

    public class PubSubListener extends JedisPubSub{
    
        private String clientId;
        private RedisHandler redisHandler;
    
        public PubSubListener(String clientId, Jedis jedis) {
            this.clientId = clientId;
            this.redisHandler = new RedisHandler(jedis);
        }
    
        @Override
        public void onMessage(String channel, String message) {
            if ("exit".equals(message)) {
                redisHandler.onUnsubscribe(channel);
            }
    
            redisHandler.hanlder(channel,message);
        }
    
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            redisHandler.subscribe(channel);
        }
    
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            redisHandler.onUnsubscribe(channel);
        }
    
        class RedisHandler{
    
            private Jedis jedis =null;
    
            public RedisHandler(Jedis jedis) {
                this.jedis = jedis;
            }
    
            /**
             * 订阅操作步骤:
             * 1.判断clientID是否在PERSITS_SUB队列中
             * 2.如果在队列中说明已经订阅,或则把clientID添加到队列中
             * @param channel
             */
            public void subscribe(String channel){
                String key = clientId + "/" + channel;
                boolean isExists = this.jedis.sismember("PERSITS_SUB",key);
                if(!isExists){
                    this.jedis.sadd("PERSITS_SUB",key);
                }
            }
    
            /**
             * 取消订阅
             * @param channel
             */
            public void onUnsubscribe(String channel){
                String key = clientId + "/" + channel;
                //从订阅者队列中删除
                this.jedis.srem("",key);
                //删除订阅者消息队列
                this.jedis.del(channel);
            }
    
            public void hanlder(String channel,String message){
                int index = message.indexOf("/");
                if(index  < 0){
                    //消息不合法,丢弃
                    return;
                }
    
                Long txid = Long.valueOf(message.substring(0,index));
                String key = clientId + "/" + channel;
                while(true){
                    String lm = this.jedis.lindex(key,0);//获取第一个消息
                    if(lm == null){
                        break;
                    }
                    int li = lm.indexOf("/");
                    if(li < 0){
                        //消息不合法
                        String result = this.jedis.lpop(key);
                        if(result == null){
                            break;
                        }
                        message(channel,message);
                        continue;
                    }
                    long lmid = Long.parseLong(lm.substring(0,li));
                    if(txid >= lmid){
                        this.jedis.lpop(key);
                        message(channel,message);
                        continue;
                    }else{
                        break;
                    }
                }
            }
        }
    
        private void message(String channel, String message) {
            System.out.println("receive message " + message);
        }
    }

      

    public class SubClient {
    
        private Jedis jedis = null;
        private PubSubListener listener = null;
    
        public SubClient(Jedis jedis, PubSubListener listener) {
            this.jedis = jedis;
            this.listener = listener;
        }
    
        public void subscribe(String channel){
            jedis.subscribe(listener,channel);
        }
    
        public void onUnsubscribe(String channel){
            listener.unsubscribe(channel);
        }
    }

      

    public class PubClient {
    
        private Jedis jedis = null;
    
        public PubClient(String host) {
            this.jedis = new Jedis(host);
        }
    
        public void put(String message){
            Set<String> clients = this.jedis.smembers("PERSITS_SUB");
            for(String client : clients){
                //在每个客户端对应的消息队列中持久化消息
                this.jedis.rpush(client,message);
            }
        }
    
        /**
         * 每个消息,都有具有一个全局唯一的id
         * txid为了防止订阅端在数据处理时“乱序”,这就要求订阅者需要解析message
         * @param channel
         * @param message
         */
        public void publish(String channel, String message) {
            Long txid = jedis.incr("MESSAGE_TXID");
            String content = txid + "/" + message;
            this.put(content);
            jedis.publish(channel, content);//为每个消息设定id,最终消息格式1000/messageContent
        }
    
        public void close(String channel){
            jedis.publish(channel, "exit");
            jedis.del(channel);//删除
        }
    }

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-12-22 12:59 , Processed in 0.090572 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表