Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 796|回复: 0

用redis解决订单超发问题的4种方法

[复制链接]
  • TA的每日心情
    奋斗
    2024-11-24 15:47
  • 签到天数: 804 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726782
    发表于 2021-8-31 17:17:17 | 显示全部楼层 |阅读模式

    用redis解决订单超发问题的4种方法

    # -*- coding: utf-8 -*-
    # 300~5000人抢100张票,保证不超发
    import redis
    import time
    import threading
    from redis import WatchError
    from redis_lock import synchronized
    
    REDIS_DATABASE = {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0
    }
    TICKET_NUM = 100  # 票数
    PEOPLE_NUM = 3000  # 人数
    
    rds = redis.Redis(host=REDIS_DATABASE['HOST'], port=REDIS_DATABASE['PORT'], db=REDIS_DATABASE['DB'])
    rds.delete('ticket_num')
    rst = rds.incr('ticket_num', amount=TICKET_NUM)
    
    rds.delete('tickets')
    values = ['' for _ in xrange(TICKET_NUM)]
    tickets = rds.lpush('tickets', *values)
    
    
    class TestRedis(threading.Thread):
        def __init__(self, t_num):
            self.t_num = t_num
            super(TestRedis, self).__init__()
    
        def run(self):
            # self.error_examples()  # 错误示范,多线程下会超发
            # self.optimistic_lock()  # 利用redis自带事务(乐观锁)
            # self.pessimistic_lock  # 自实现的悲观锁,比乐观锁快一丢丢
            # self.redis_list()  # 利用redis单线程特性,队列操作
            self.redis_incr()  # 推荐方法!利用redis单线程特性,计数器操作
    
        def error_examples(self):
            """
            错误示范,多线程下会超发
            :return:
            """
            ticket_num = int(rds.get('ticket_num'))
            time.sleep(0.1)  # 加上sleep效果更明显
            if ticket_num > 0:
                print('t_num=%s, ticket_num=%s' % (self.t_num, ticket_num))
                rds.set('ticket_num', ticket_num-1)
    
        def optimistic_lock(self):
            """
            乐观锁
            :return:
            """
            while 1:
                with rds.pipeline(transaction=True) as r_pip:
                    r_pip.watch('ticket_num')
                    try:
                        r_pip.multi()
                        ticket_num = int(rds.get('ticket_num'))
                        if ticket_num > 0:
                            r_pip.decr('ticket_num')
                        r_pip.execute()
                        return
                    except WatchError:
                        r_pip.unwatch()
    
        @synchronized(rds, "lock", 1000)
        def pessimistic_lock(self):
            """
            悲观锁
            :return:
            """
            ticket_num = int(rds.get('ticket_num'))
            if ticket_num > 0:
                rds.decr('ticket_num')
    
        def redis_list(self):
            """
            减列表方式,防止超发。利用redis单线程特性
            缺点:消耗内存
            :return:
            """
            ticket = rds.lpop('tickets')
            if ticket is not None:
                rds.decr('ticket_num')
    
        def redis_incr(self):
            """
            利用redis单线程特性。
            :return:
            """
            time.sleep(0.1)
            if int(rds.get('ticket_num')) > 0:
                de_num = rds.decr('ticket_num')  # 当只剩最后一张票时,多个线程都取到1,同时减后会成负数即“超发”
                if de_num < 0:  # “超发”后补回。不继续操作
                    print('Overshoot callback %s' % de_num)
                    rds.incr('ticket_num')
    
    
    tests = []
    for i in xrange(PEOPLE_NUM):
        t = TestRedis(i+1)
        tests.append(t)
    
    s = time.time()
    for t in tests:
        t.start()
    for t in tests:
        t.join()
    
    print('result ticket_num=%s, time=%s' % (rds.get('ticket_num'), (time.time()-s)*1000))

     

    redis_lock.py

    # coding: utf-8
    """
        redis 分布式悲观锁,需要解决以下几个问题
        1、A获取锁后崩溃,需要能将锁释放
        2、A获取锁后处理时间过长,导致锁过期,被B获取,A处理完后错误的将B锁释放
        
        redis.Redis()会有些问题,连接最好使用redis.StrictRedis()
    """
    
    import math
    import time
    import uuid
    from contextlib import contextmanager
    from functools import wraps
    
    from redis import WatchError
    
    
    def acquire_lock(conn, lock_name, acquire_timeout=1, lock_timeout=1):
        """
        获取锁
        :param conn: redis连接
        :param lock_name: 锁名称
        :param acquire_timeout: 获取锁最长等待时间,-1为永久阻塞等待
        :param lock_timeout: 锁超时时间
        :return: 
        """
    
        def should_acquire():
            if acquire_timeout == -1:
                return True
            acquire_end = time.time() + acquire_timeout
            return time.time() < acquire_end
    
        identity = str(uuid.uuid1())
        lock_timeout = int(math.ceil(lock_timeout))
        while should_acquire():
            if conn.set(lock_name, identity, ex=lock_timeout, nx=True):
                return identity
            else:
                pttl = conn.pttl(lock_name)
                # Redis or StrictRedis
                # 如果使用的是Redis , 可能会存在pttl为0 但是显示为None的情况
                if pttl is None or pttl == -1:
                    conn.expire(lock_name, lock_timeout)
            time.sleep(.1)
        return None
    
    
    def release_lock(conn, lock_name, identity):
        pipe = conn.pipeline(True)
        while True:
            try:
                pipe.watch(lock_name)
                if pipe.get(lock_name) == identity:
                    pipe.delete(lock_name)
                    return True
                pipe.unwatch()
                break
            except WatchError:
                pass
        return False
    
    
    @contextmanager
    def lock(conn, lock_name, lock_timeout):
        """
        with lock(conn, "lock", 10):
            do something
        """
        id_ = None
        try:
            id_ = acquire_lock(conn, lock_name, -1, lock_timeout)
            yield id_
        finally:
            release_lock(conn, lock_name, id_)
    
    
    def synchronized(conn, lock_name, lock_timeout):
        """
        @synchronized(conn, "lock", 10)
        def fun():
            counter = int(r.get("counter"))
            counter += 1
            r.set("counter", counter)
        """
    
        def decorator(func):
            @wraps(func)
            def wrap(*args, **kwargs):
                with lock(conn, lock_name, lock_timeout):
                    return func(*args, **kwargs)
    
            return wrap
    
        return decorator
    
    
    if __name__ == '__main__':
        import redis
    
        r = redis.Redis("localhost", db=5)
    
        id_ = acquire_lock(r, "lock", acquire_timeout=1, lock_timeout=10)
        release_lock(r, "lock", id_)
        with lock(r, "lock", 1):
            print("do something")
    
    
        @synchronized(r, "lock", 10)
        def fun():
            counter = int(r.get("counter"))
            counter += 1
            r.set("counter", counter)
    
    
        for i in range(10000):
            fun()

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-1-21 15:27 , Processed in 0.097715 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表