Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 865|回复: 0

07: redis分布式锁解决超卖问题

[复制链接]
  • TA的每日心情
    奋斗
    2024-11-24 15:47
  • 签到天数: 804 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726782
    发表于 2021-6-17 15:33:55 | 显示全部楼层 |阅读模式

    1.1 redis事物

      1、redis事物介绍

          1. redis事物是可以一次执行多个命令,本质是一组命令的集合。

          2. 一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入

          作用:一个队列中,一次性、顺序性、排他性的执行一系列命令 

      2、multi 指令基本使用

          1. 下面指令演示了一个完整的事物过程,所有指令在exec前不执行,而是缓存在服务器的一个事物队列中

          2. 服务器一旦收到exec指令才开始执行事物队列,执行完毕后一次性返回所有结果

          3. 因为redis是单线程的,所以不必担心自己在执行队列是被打断,可以保证这样的“原子性”

          注:redis事物在遇到指令失败后,后面的指令会继续执行

    # Multi 命令用于标记一个事务块的开始事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性( atomic )地执行
    > multi(开始一个redis事物)
    incr books
    incr books
    > exec (执行事物)
    > discard (丢弃事物)
    [root@redis ~]# redis-cli
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set test 123
    QUEUED
    127.0.0.1:6379> exec
    1) OK
    127.0.0.1:6379> get test
    "123"
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set test 456
    QUEUED
    127.0.0.1:6379> discard
    OK
    127.0.0.1:6379> get test
    "123"
    127.0.0.1:6379> 
    在命令行测试redis事物
    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    import redis
    r = redis.Redis(host='127.0.0.1')
    pipe = r.pipeline()
    pipe.multi()             #开启事务
    pipe.set('key2', 4)      #存储子命令
    pipe.execute()           #执行事务
    print(r.get('key2'))
    使用python测试redis事物

        注:mysql的rollback与redis的discard的区别

            1. mysql回滚为sql全部成功才执行,一条sql失败则全部失败,执行rollback后所有语句造成的影响消失

            2. redis的discard只是结束本次事务,正确命令造成的影响仍然还在.

              1)redis如果在一个事务中的命令出现错误,那么所有的命令都不会执行
              2)redis如果在一个事务中出现运行错误,那么正确的命令会被执行

      3、watch 指令作用

          实质:WATCH 只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端(通过 WatchError 异常)但不会阻止其他客户端对数据的修改

          1. watch其实就是redis提供的一种乐观锁,可以解决并发修改问题

          2. watch会在事物开始前盯住一个或多个关键变量,当服务器收到exec指令要顺序执行缓存中的事物队列时,redis会检查关键变量自watch后是否被修改

          3. WATCH 只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端(通过 WatchError 异常)但不会阻止其他客户端对数据的修改

    1.2 setnx(redis分布式锁)

      1、分布式锁

          1. 分布式锁本质是占一个坑,当别的进程也要来占坑时发现已经被占,就会放弃或者稍后重试

          2. 占坑一般使用 setnx(set if not exists)指令,只允许一个客户端占坑

          3. 先来先占,用完了在调用del指令释放坑

    > setnx lock:codehole true
    .... do something critical ....
    > del lock:codehole

          4. 但是这样有一个问题,如果逻辑执行到中间出现异常,可能导致del指令没有被调用,这样就会陷入死锁,锁永远无法释放

          5. 为了解决死锁问题,我们拿到锁时可以加上一个expire过期时间,这样即使出现异常,当到达过期时间也会自动释放锁

    > setnx lock:codehole true
    > expire lock:codehole 5
    .... do something critical ....
    > del lock:codehole

          6. 这样又有一个问题,setnx和expire是两条指令而不是原子指令,如果两条指令之间进程挂掉依然会出现死锁

          7. 为了治理上面乱象,在redis 2.8中加入了set指令的扩展参数,使setnx和expire指令可以一起执行

    > set lock:codehole true ex 5 nx
    ''' do something '''
    > del lock:codehole

     1.3 redis解决超卖问题

      1、使用reids的 watch + multi 指令实现

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    import redis
    def sale(rs):
        while True:
            with rs.pipeline() as p:
                try:
                    p.watch('apple')                   # 监听key值为apple的数据数量改变
                    count = int(rs.get('apple'))
                    print('拿取到了苹果的数量: %d' % count)
                    p.multi()                          # 事务开始
                    if count> 0 :                      # 如果此时还有库存
                        p.set('apple', count - 1)
                        p.execute()                    # 执行事务
                    p.unwatch()
                    break                              # 当库存成功减一或没有库存时跳出执行循环
                except Exception as e:                 # 当出现watch监听值出现修改时,WatchError异常抛出
                    print('[Error]: %s' % e)
                    continue                           # 继续尝试执行
    
    rs = redis.Redis(host='127.0.0.1', port=6379)      # 连接redis
    rs.set('apple',1000)                               # # 首先在redis中设置某商品apple 对应数量value值为1000
    sale(rs)
    watch+multi解决超卖问题

        1)原理

            1. 当用户购买时,通过 WATCH 监听用户库存,如果库存在watch监听后发生改变,就会捕获异常而放弃对库存减一操作

            2. 如果库存没有监听到变化并且数量大于1,则库存数量减一,并执行任务

         2)弊端

            1. Redis 在尝试完成一个事务的时候,可能会因为事务的失败而重复尝试重新执行

            2. 保证商品的库存量正确是一件很重要的事情,但是单纯的使用 WATCH 这样的机制对服务器压力过大

       2、使用reids的 watch + multi + setnx  指令实现

         1)为什么要自己构建锁

            1. 虽然有类似的 SETNX 命令可以实现 Redis 中的锁的功能,但他锁提供的机制并不完整

            2. 并且setnx也不具备分布式锁的一些高级特性,还是得通过我们手动构建

        2)创建一个redis锁

            1. 在 Redis 中,可以通过使用 SETNX 命令来构建锁:rs.setnx(lock_name, uuid值)

            2. 而锁要做的事情就是将一个随机生成的 128 位 UUID 设置位键的值,防止该锁被其他进程获取

         3)释放锁

            1. 锁的删除操作很简单,只需要将对应锁的 key 值获取到的 uuid 结果进行判断验证

            2. 符合条件(判断uuid值)通过 delete 在 redis 中删除即可,pipe.delete(lockname)

            3. 此外当其他用户持有同名锁时,由于 uuid 的不同,经过验证后不会错误释放掉别人的锁

         4)解决锁无法释放问题

            1. 在之前的锁中,还出现这样的问题,比如某个进程持有锁之后突然程序崩溃,那么会导致锁无法释放

            2. 而其他进程无法持有锁继续工作,为了解决这样的问题,可以在获取锁的时候加上锁的超时功能

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    import redis
    import uuid
    import time
    
    # 1.初始化连接函数
    def get_conn(host,port=6379):
        rs = redis.Redis(host=host, port=port)
        return rs
    
    # 2. 构建redis锁
    def acquire_lock(rs, lock_name, expire_time=10):
        '''
        rs: 连接对象
        lock_name: 锁标识
        acquire_time: 过期超时时间
        return -> False 获锁失败 or True 获锁成功
        '''
        identifier = str(uuid.uuid4())
        end = time.time() + expire_time
        while time.time() < end:
            # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
            if rs.setnx(lock_name, identifier): # 尝试取得锁
                return identifier
            time.sleep(.001)
            return False
    
    # 3. 释放锁
    def release_lock(rs, lockname, identifier):
        '''
        rs: 连接对象
        lockname: 锁标识
        identifier: 锁的value值,用来校验
        '''
        pipe = rs.pipeline(True)
        try:
            pipe.watch(lockname)
            if rs.get(lockname).decode() == identifier:  # 防止其他进程同名锁被误删
                pipe.multi()           # 开启事务
                pipe.delete(lockname)
                pipe.execute()
                return True            # 删除锁
            pipe.unwatch()              # 取消事务
        except Exception as e:
            pass
        return False                    # 删除失败
    
    
    '''在业务函数中使用上面的锁'''
    def sale(rs):
        start = time.time()            # 程序启动时间
        with rs.pipeline() as p:
            '''
            通过管道方式进行连接
            多条命令执行结束,一次性获取结果
            '''
            while True:
                lock = acquire_lock(rs, 'lock')
                if not lock: # 持锁失败
                    continue
                try:
                    count = int(rs.get('apple')) # 取量
                    p.set('apple', count-1)      # 减量
                    p.execute()
                    print('当前库存量: %s' % count)
                    break
                finally:
                    release_lock(rs, 'lock', lock)
            print('[time]: %.2f' % (time.time() - start))
    
    rs = redis.Redis(host='127.0.0.1', port=6379)      # 连接redis
    rs.set('apple',1000)                               # # 首先在redis中设置某商品apple 对应数量value值为1000
    sale(rs)
    setnx+watch+multi解决超卖问题
    def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
        '''
        rs: 连接对象
        lock_name: 锁标识
        acquire_time: 过期超时时间
        locked_time: 锁的有效时间
        return -> False 获锁失败 or True 获锁成功
        '''
        identifier = str(uuid.uuid4())
        end = time.time() + expire_time
        while time.time() < end:
            # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
            if rs.setnx(lock_name, identifier): # 尝试取得锁
                # print('锁已设置: %s' % identifier)
                rs.expire(lock_name, locked_time)
                return identifier
            time.sleep(.001)
        return False
    优化:给分布式锁加超时时间防止死锁

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-1-23 03:07 , Processed in 0.059726 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表