Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 359|回复: 0

Redis缓存穿透、缓存雪崩、并发问题分析与解决方案

[复制链接]
  • TA的每日心情
    奋斗
    2024-11-24 15:47
  • 签到天数: 804 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726782
    发表于 2021-4-29 11:58:08 | 显示全部楼层 |阅读模式

    (一)缓存和数据库间数据一致性问题

    分布式环境下(单机就不用说了)非常容易出现缓存和数据库间的数据一致性问题,针对这一点的话,只能说,如果你的项目对缓存的要求是强一致性的,那么请不要使用缓存。我们只能采取合适的策略来降低缓存和数据库间数据不一致的概率,而无法保证两者间的强一致性。合适的策略包括 合适的缓存更新策略,更新数据库后要及时更新缓存、缓存失败时增加重试机制,例如MQ模式的消息队列。

    (二)缓存击穿问题

    缓存击穿表示恶意用户模拟请求很多缓存中不存在的数据,由于缓存中都没有,导致这些请求短时间内直接落在了数据库上,导致数据库异常。这个我们在实际项目就遇到了,有些抢购活动、秒杀活动的接口API被大量的恶意用户刷,导致短时间内数据库宕机了,好在数据库是多主多从的,hold住了。

    解决方案的话:

    1、使用互斥锁排队

    业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。

    一、加锁操作

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>
    private static final String LOCKED_SUCCESS = "OK";
    private static final String NX = "NX";
    private static final String EXPIRE_TIME = "PX";
     
    /**
    * 获取锁
    *
    * @param jedis   redis客户端
    * @param lockKey    锁的key
    * @param uniqueId   请求标识
    * @param expireTime 过期时间
    * @return 是否获取锁
    */
    public static boolean tryDistributedLock(Jedis jedis, String lockKey, String uniqueId, long expireTime) {
        String result = jedis.set(lockKey, uniqueId, NX, EXPIRE_TIME, expireTime);
        return LOCKED_SUCCESS.equals(result);
    }

    在低版本的redis中是没有这个set方法的,至于为什么这个简单的set方法能够保证前面提到的分布式锁原则呢?看下这个set的源码参数

    /**
    * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1
    * GB).
    * @param key  唯一标识key
    * @param value   存储的值value
    * @param nxxx  可选项:NX、XX  其中NX表示当key不存在时才set值,XX表示当key存在时才set值
    * @param expx  过期时间单位,可选项:EX|PX 其中EX为seconds,PX为milliseconds
    * @param time  过期时间,单位取上一个参数
    * @return Status code reply
    */
    public String set(final String key, final String value, final String nxxx, final String expx,
        final long time) {
      checkIsInMultiOrPipeline();
      client.set(key, value, nxxx, expx, time);
      return client.getStatusCodeReply();
    }

    如上的tryDistributedLock就可以实现简单的redis分布式锁了(此set方法的原子性)

    1、set方法中nxx参数为NX,表示当key不存在时才会set值,保证了互斥性;

    2、set值的同时设置过期时间(过期后del此key),客户端宕机或网络延迟时不会一直持有锁,避免了死锁发生;

    3、set方法中的value,比如UUID之类的,用来表示当前请求客户端的唯一性标识;

    4、因为是redis单例,暂时没有考虑容错性;

    常见的错误分布式加锁实现

    private static final Long RELEASE_SUCCESS = 1L;
     
    /**
    * 释放锁
    *
    * @param jedis     redis客户端
    * @param lockKey   锁的key
    * @param uniqueId 请求标识
    * @return 是否释放
    */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String uniqueId) {
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(luaScript, Collections.singletonList(lockKey), Collections.singletonList(uniqueId));
        return RELEASE_SUCCESS.equals(result);
    }

    二、解锁操作

    private static final Long RELEASE_SUCCESS = 1L;
     
    /**
    * 释放锁
    *
    * @param jedis     redis客户端
    * @param lockKey   锁的key
    * @param uniqueId 请求标识
    * @return 是否释放
    */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String uniqueId) {
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(luaScript, Collections.singletonList(lockKey), Collections.singletonList(uniqueId));
        return RELEASE_SUCCESS.equals(result);
    }

    不难看出使用Lua脚本告诉redis,如果这个key存在,且其存储的值和指定的value一致才可以删除这个key,从而释放锁,这样也保证了分布式锁的几个原则. 常见的错误释放锁会直接del这个key,没有考虑当前锁的拥有者,不符合分布式锁原则的有始有终原则;

    如果不想用上面的Lua脚本,也可以用如下代码:

    public static void releaseLock(Jedis jedis,String lockKey,String uniqueId){
          if(uniqueId.equals(jedis.get(lockKey))){
              jedis.del(lockKey);
          }
    }

     

    public String getWithLock( String key, Jedis jedis, String lockKey, String uniqueId, long expireTime )
    {
        /* 通过key获取value */
        String value = redisService.get( key );
        if ( StringUtil.isEmpty( value ) )
        {
            /*
             * 分布式锁,详细可以参考https://blog.csdn.net/fanrenxiang/article/details/79803037
             * 封装的tryDistributedLock包括setnx和expire两个功能,在低版本的redis中不支持
             */
            try {
                boolean locked = redisService.tryDistributedLock( jedis, lockKey, uniqueId, expireTime );
                if ( locked )
                {
                    value = userService.getById( key );
                    redisService.set( key, value );
                    redisService.del( lockKey );
                    return(value);
                } else {
                    /* 其它线程进来了没获取到锁便等待50ms后重试 */
                    Thread.sleep( 50 );
                    getWithLock( key, jedis, lockKey, uniqueId, expireTime );
                }
            } catch ( Exception e ) {
                log.error( "getWithLock exception=" + e );
                return(value);
            } finally {
                redisService.releaseDistributedLock( jedis, lockKey, uniqueId );
            }
        }
        return(value);
    }

    这样做思路比较清晰,也从一定程度上减轻数据库压力,但是锁机制使得逻辑的复杂度增加,吞吐量也降低了,有点治标不治本。

    2、布隆过滤器(推荐)

    bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。布隆过滤器的关键就在于hash算法和容器大小,下面先来简单的实现下看看效果,我这里用guava实现的布隆过滤器:

    <dependencies >
     < dependency >
     < groupId > com.google.guava</ groupId>
     < artifactId > guava</ artifactId>
     < version > 23.0 < / version >
     < / dependency >
     < / dependencies >
     public class BloomFilterTest {
         private static final int capacity    = 1000000;
         private static final int key        = 999998;
         private static BloomFilter<Integer> bloomFilter = BloomFilter.create( Funnels.integerFunnel(), capacity );
         static {
             for ( int i = 0; i < capacity; i++ )
             {
                 bloomFilter.put( i );
             }
         }
         public static void main( String[] args )
         {
     /*返回计算机最精确的时间,单位微妙*/
             long start = System.nanoTime();
             if ( bloomFilter.mightContain( key ) )
             {
                 System.out.println( "成功过滤到" + key );
             }
             long end = System.nanoTime();
             System.out.println( "布隆过滤器消耗时间:" + (end - start) );
             int sum = 0;
             for ( int i = capacity + 20000; i < capacity + 30000; i++ )
             {
                 if ( bloomFilter.mightContain( i ) )
                 {
                     sum = sum + 1;
                 }
             }
             System.out.println( "错判率为:" + sum );
         }
     }
     成 功过滤到999998
     布 隆过滤器消 耗 时间 : 215518
     错 判率 为 : 318

    可以看到,100w个数据中只消耗了约0.2毫秒就匹配到了key,速度足够快。然后模拟了1w个不存在于布隆过滤器中的key,匹配错误率为318/10000,也就是说,出错率大概为3%,跟踪下BloomFilter的源码发现默认的容错率就是0.03:

    public static < T > BloomFilter<T> create( Funnel<T> funnel, int expectedInsertions /* n */ )
    {
        return(create( funnel, expectedInsertions, 0.03 ) ); /* FYI, for 3%, we always get 5 hash functions */
    }

    要注意的是,布隆过滤器不支持删除操作。用在这边解决缓存穿透问题就是:

    public String getByKey( String key )
    {
        /* 通过key获取value */
        String value = redisService.get( key );
        if ( StringUtil.isEmpty( value ) )
        {
            if ( bloomFilter.mightContain( key ) )
            {
                value = userService.getById( key );
                redisService.set( key, value );
                return(value);
            } else {
                return(null);
            }
        }
        return(value);
    }

    (三)缓存雪崩问题

    缓存在同一时间内大量键过期(失效),接着来的一大波请求瞬间都落在了数据库中导致连接异常。

    解决方案:

    1、也是像解决缓存穿透一样加锁排队,实现同上;

    2、建立备份缓存,缓存A和缓存B,A设置超时时间,B不设值超时时间,先从A读缓存,A没有读B,并且更新A缓存和B缓存;

    public String getByKey( String keyA, String keyB )
    {
        String value = redisService.get( keyA );
        if ( StringUtil.isEmpty( value ) )
        {
            value = redisService.get( keyB );
            String newValue = getFromDbById();
            redisService.set( keyA, newValue, 31, TimeUnit.DAYS );
            redisService.set( keyB, newValue );
        }
        return(value);
    }

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-1-22 22:02 , Processed in 0.062160 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表