# -*- coding: utf-8 -*-
# 300~5000人抢100张票,保证不超发
import redis
import time
import threading
from redis import WatchError
from redis_lock import synchronized
REDIS_DATABASE = {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0
}
TICKET_NUM = 100 # 票数
PEOPLE_NUM = 3000 # 人数
rds = redis.Redis(host=REDIS_DATABASE['HOST'], port=REDIS_DATABASE['PORT'], db=REDIS_DATABASE['DB'])
rds.delete('ticket_num')
rst = rds.incr('ticket_num', amount=TICKET_NUM)
rds.delete('tickets')
values = ['' for _ in xrange(TICKET_NUM)]
tickets = rds.lpush('tickets', *values)
class TestRedis(threading.Thread):
def __init__(self, t_num):
self.t_num = t_num
super(TestRedis, self).__init__()
def run(self):
# self.error_examples() # 错误示范,多线程下会超发
# self.optimistic_lock() # 利用redis自带事务(乐观锁)
# self.pessimistic_lock # 自实现的悲观锁,比乐观锁快一丢丢
# self.redis_list() # 利用redis单线程特性,队列操作
self.redis_incr() # 推荐方法!利用redis单线程特性,计数器操作
def error_examples(self):
"""
错误示范,多线程下会超发
:return:
"""
ticket_num = int(rds.get('ticket_num'))
time.sleep(0.1) # 加上sleep效果更明显
if ticket_num > 0:
print('t_num=%s, ticket_num=%s' % (self.t_num, ticket_num))
rds.set('ticket_num', ticket_num-1)
def optimistic_lock(self):
"""
乐观锁
:return:
"""
while 1:
with rds.pipeline(transaction=True) as r_pip:
r_pip.watch('ticket_num')
try:
r_pip.multi()
ticket_num = int(rds.get('ticket_num'))
if ticket_num > 0:
r_pip.decr('ticket_num')
r_pip.execute()
return
except WatchError:
r_pip.unwatch()
@synchronized(rds, "lock", 1000)
def pessimistic_lock(self):
"""
悲观锁
:return:
"""
ticket_num = int(rds.get('ticket_num'))
if ticket_num > 0:
rds.decr('ticket_num')
def redis_list(self):
"""
减列表方式,防止超发。利用redis单线程特性
缺点:消耗内存
:return:
"""
ticket = rds.lpop('tickets')
if ticket is not None:
rds.decr('ticket_num')
def redis_incr(self):
"""
利用redis单线程特性。
:return:
"""
time.sleep(0.1)
if int(rds.get('ticket_num')) > 0:
de_num = rds.decr('ticket_num') # 当只剩最后一张票时,多个线程都取到1,同时减后会成负数即“超发”
if de_num < 0: # “超发”后补回。不继续操作
print('Overshoot callback %s' % de_num)
rds.incr('ticket_num')
tests = []
for i in xrange(PEOPLE_NUM):
t = TestRedis(i+1)
tests.append(t)
s = time.time()
for t in tests:
t.start()
for t in tests:
t.join()
print('result ticket_num=%s, time=%s' % (rds.get('ticket_num'), (time.time()-s)*1000))
# coding: utf-8
"""
redis 分布式悲观锁,需要解决以下几个问题
1、A获取锁后崩溃,需要能将锁释放
2、A获取锁后处理时间过长,导致锁过期,被B获取,A处理完后错误的将B锁释放
redis.Redis()会有些问题,连接最好使用redis.StrictRedis()
"""
import math
import time
import uuid
from contextlib import contextmanager
from functools import wraps
from redis import WatchError
def acquire_lock(conn, lock_name, acquire_timeout=1, lock_timeout=1):
"""
获取锁
:param conn: redis连接
:param lock_name: 锁名称
:param acquire_timeout: 获取锁最长等待时间,-1为永久阻塞等待
:param lock_timeout: 锁超时时间
:return:
"""
def should_acquire():
if acquire_timeout == -1:
return True
acquire_end = time.time() + acquire_timeout
return time.time() < acquire_end
identity = str(uuid.uuid1())
lock_timeout = int(math.ceil(lock_timeout))
while should_acquire():
if conn.set(lock_name, identity, ex=lock_timeout, nx=True):
return identity
else:
pttl = conn.pttl(lock_name)
# Redis or StrictRedis
# 如果使用的是Redis , 可能会存在pttl为0 但是显示为None的情况
if pttl is None or pttl == -1:
conn.expire(lock_name, lock_timeout)
time.sleep(.1)
return None
def release_lock(conn, lock_name, identity):
pipe = conn.pipeline(True)
while True:
try:
pipe.watch(lock_name)
if pipe.get(lock_name) == identity:
pipe.delete(lock_name)
return True
pipe.unwatch()
break
except WatchError:
pass
return False
@contextmanager
def lock(conn, lock_name, lock_timeout):
"""
with lock(conn, "lock", 10):
do something
"""
id_ = None
try:
id_ = acquire_lock(conn, lock_name, -1, lock_timeout)
yield id_
finally:
release_lock(conn, lock_name, id_)
def synchronized(conn, lock_name, lock_timeout):
"""
@synchronized(conn, "lock", 10)
def fun():
counter = int(r.get("counter"))
counter += 1
r.set("counter", counter)
"""
def decorator(func):
@wraps(func)
def wrap(*args, **kwargs):
with lock(conn, lock_name, lock_timeout):
return func(*args, **kwargs)
return wrap
return decorator
if __name__ == '__main__':
import redis
r = redis.Redis("localhost", db=5)
id_ = acquire_lock(r, "lock", acquire_timeout=1, lock_timeout=10)
release_lock(r, "lock", id_)
with lock(r, "lock", 1):
print("do something")
@synchronized(r, "lock", 10)
def fun():
counter = int(r.get("counter"))
counter += 1
r.set("counter", counter)
for i in range(10000):
fun()