sqlachemy 是python的orm框架,在使用一段时间后,我们通常会出现事务嵌套的情况,看到很多人写代码的时候,居然是session到处传递,这无疑是加大了代码之间的耦合度。
案例:
def save(session):
# TODO
def update(session):
# TODO
def service():
session = getSession();
try:
save(session);
update(session);
session.commit();
except Exception as e:
session.rollback();
finally:
if not session:
session.close();
假设save和update是同一个事务,但是上述的实践缺强制了save和update的 session相耦合来达成,好的实践应该是:save和update无任何关系,只是在实现业务逻辑时,组合到一个事务,确保事务性即可,即:
def service():
try:
save();
update();
except Exception as e:
# TODO
因此如何解决这个问题是本文需要阐述的主题。
解决方案是:
ACID是事务的四个基本特征,通常我们的理解事务是一个操作单元,要么一起成功要么一起失败(原子性);通过一个例子来直接说明如何解决的。
场景:注册用户(添加一个用户,会未用户送10个积分)
创建表
create table user(
id int not null auto_increment,
name varchar(255) not null default '' comment '用户名',
created_at datetime not null default current_timestamp comment '创建时间',
updated_at datetime not null default current_timestamp comment '更新时间',
primary key (`id`)
)engine innodb charset=utf8 comment '用户表';
create table user_credits(
id int not null auto_increment,
user_id int not null default 0 comment '用户ID',
user_name varchar(255) not null default '' comment '用户名',
score int not null default 0 comment '积分',
created_at datetime not null default current_timestamp comment '创建时间',
updated_at datetime not null default current_timestamp comment '更新时间',
primary key (`id`),
unique key `uk_user_id` (`user_id`)
)engine innodb charset=utf8 comment '用户积分表';
sqlalchemy 实现
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
engine = create_engine("mysql+pymysql://root:root@localhost:3306/csdn", echo=True)
# 必须使用scoped_session,域session可以将session进行共享
DBSession = scoped_session(sessionmaker(bind=engine))
BaseModel = declarative_base()
# ----------- Relation Model Object---------------- #
class User(BaseModel):
__tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String)
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now)
class UserCredits(BaseModel):
__tablename__ = "user_credits"
id = Column(Integer, primary_key=True)
user_id = Column(Integer)
user_name = Column(String)
score = Column(Integer)
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now)
# ----------- Service implements---------------- #
def add_user(user):
" 添加用户 "
session = DBSession()
try:
session.add(user)
session.commit()
except Exception as e:
session.rollback()
print("AddUser: ======={}=======".format(e))
finally:
if not session:
session.close()
def add_user_credits(userCredits, interrupt=True):
" 添加用户积分记录 "
session = DBSession()
try:
if interrupt:
raise Exception("--- interrupt ---")
session.add(userCredits)
session.commit()
except Exception as e:
session.rollback()
print("AddUserCredits: ======={}=======".format(e))
finally:
if not session:
session.close()
def regist_user():
session = DBSession()
try:
# 开启子事务
session.begin(subtransactions=True)
# TODO Service
user = User(name='wangzhiping')
add_user(user)
add_user_credits(UserCredits(
user_id=user.id,
user_name=user.name,
score=10
), False)
session.commit()
except Exception as e:
session.rollback()
print("AddUserCredits: ======={}=======".format(e))
finally:
if not session:
session.close()
# ---------- exec -----------
regist_user()
1,设置session时,需要指定为scoped_session,目的是session可以共享(ThreadLocal);
2,session.begin(subtransactions=True) 开启子事务管理;
这是实际上regist_user是在同一个线程中的session,这是add_user,add_user_credits实际上session是同一个,所以可以实现。其实这个可以更进一步扩展,把事务隔离级别,传播属性,这里不做介绍
---------------------
作者:紫守笨
来源:CSDN
原文:https://blog.csdn.net/program_red/article/details/55194130?utm_source=copy
版权声明:本文为博主原创文章,转载请附上博文链接!
转载:https://blog.csdn.net/program_red/article/details/55194130 |