先推荐一下码云上的一个GVP(最有价值的开源项目) AgileBPM(下面简称ab),我下面讲解的方案也是它的Bo支持多数据源操作事务管理器,友情链接:http://doc.agilebpm.cn/
目前是解决的是处理单系统内的多数据源问题,简单来说就是在单系统中的一个线程内,保护多个数据源事务,这也是ab项目所需要的场景。
参考了码云上的开源的lcn分布式事务解决方案,觉得再拓展一下也是可以解决微服务间的分布式事务处理,利用redis放一个事务处理的共同空间,然后在共同空间内来统筹事务,不过它处理commit异常的问题也是用通用方式(commit失败很多项目都是采取tcc的方式处理)。
ps:之前本人试过使用jta事务管理器,这个性能真看不下去。一会就卡。。所以就想着自己定义个管理器,自己来释放资源。
1 用AbstractRoutingDataSource让系统支持多数据源
动态数据源配置:
真正的数据源(druid数据源):
展示一下DynamicDataSource是继承了AbstractRoutingDataSource的实现,这里不是重点。
2 实现支持这种路由数据源的事务管理器
先继承AbstractPlatformTransactionManager(事务管理器的抽象类,我们很常用的DataSourceTransactionManager就是继承它的)
里面需要实现几个关键点就行(笔者只考虑了事务传播性为PROPAGATION_REQUIRED的情况,这也是项目最常用的,其他我没支持,毕竟是定制化的事务管理器)
package com.dstz.bus.service.impl;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.sql.DataSource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.ConnectionHolder;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.ResourceTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import com.dstz.base.core.util.AppUtil;
import com.dstz.base.core.util.ThreadMapUtil;
import com.dstz.base.db.datasource.DataSourceUtil;
import com.dstz.base.db.datasource.DbContextHolder;
import com.dstz.base.db.datasource.DynamicDataSource;
/**
* <pre>
* 描述:ab 结合sys多数据源操作 专门为bo db实例化做的事务管理器
* 它只保护系统数据源(包含dataSourceDefault),不会保护datasource
* 其实可以做到,但是这个事务管理器目前只为bo多数据源的保护,所以我没支持
* 作者:aschs
* 邮箱:aschs@qq.com
* 日期:2018年10月10日
* 版权:summer
* </pre>
*/
public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
private int i = 0;
@Override
public void afterPropertiesSet() throws Exception {
logger.debug("ab的事务管理器已就绪");
}
@Override
public Object getResourceFactory() {
return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE);
}
/**
* <pre>
* 生成一个在整个事务处理都用到的资源
* 这里我放了在过程中的所有连接 Map<数据源别名,连接>
* </pre>
*/
@Override
protected Object doGetTransaction() {
return new HashMap<String, Connection>();
}
/**
* 判断是否已存在事务
*/
@Override
protected boolean isExistingTransaction(Object transaction) {
return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false);
}
/**
* <pre>
* 必须实现的一个方法,设置线程内的事务为回滚状态。
* 这里其实是为了预防传播性设置为 让线程内可以多次管理器操作的情况下,用来通知大家不要只做回滚,别commit了。
* 在该事务管理器只支持PROPAGATION_REQUIRED 的情况下(线程只有一个管理器操作),没多大用,只是必须要实现这个
* 不然抽象类那里会有报错代码。
* </pre>
*/
@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//标记ab事务管理器在线程内已准备要回滚了
}
/**
* <pre>
* 准备事务,获取链接
* </pre>
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
logger.info("分布式事务开始:"+i);
Map<String, Connection> conMap = (Map<String, Connection>) transaction;
Map<String, DataSource> dsMap = DataSourceUtil.getDataSources();
// 遍历系统中的所有数据源,打开连接
for (Entry<String, DataSource> entry : dsMap.entrySet()) {
Connection con = null;
try {
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue());
if (conHolder == null) {
con = entry.getValue().getConnection();
con.setAutoCommit(false);
// 缓存链接
TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con));
} else {
con = conHolder.getConnection();
}
//系统数据源放进资源里
if(DbContextHolder.getDataSource().equals(entry.getKey())) {
DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con));
}
conMap.put(entry.getKey(), con);
logger.debug("数据源别名[" + entry.getKey() + "]打开连接成功");
} catch (Throwable ex) {
doCleanupAfterCompletion(conMap);
throw new CannotCreateTransactionException("数据源别名[" + entry.getKey() + "]打开连接错误", ex);
}
}
ThreadMapUtil.put("abTransactionManagerExist", true);//标记ab事务管理器已经在线程内启动了
}
@Override
protected void doCommit(DefaultTransactionStatus status) {
Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
for (Entry<String, Connection> entry : conMap.entrySet()) {
try {
entry.getValue().commit();
logger.debug("数据源别名[" + entry.getKey() + "]提交事务成功");
} catch (SQLException ex) {
doCleanupAfterCompletion(conMap);
throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]提交事务失败", ex);
}
}
logger.info("分布式事务提交:"+i);
}
/**
* 回滚
*/
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
for (Entry<String, Connection> entry : conMap.entrySet()) {
try {
entry.getValue().rollback();
logger.debug("数据源别名[" + entry.getKey() + "]回滚事务成功");
} catch (SQLException ex) {
doCleanupAfterCompletion(conMap);
throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]回滚事务失败", ex);
}
}
logger.info("分布式事务回滚:"+i);
}
/**
* 回收链接
*/
@Override
protected void doCleanupAfterCompletion(Object transaction) {
Map<String, Connection> conMap = (Map<String, Connection>) transaction;
for (Entry<String, Connection> entry : conMap.entrySet()) {
DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey());
TransactionSynchronizationManager.unbindResource(dataSource);
DataSourceUtils.releaseConnection(entry.getValue(), dataSource);
logger.debug("数据源别名[" + entry.getKey() + "]关闭链接成功");
}
//最后把本地资源也释放了
DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
TransactionSynchronizationManager.unbindResource(dynamicDataSource);
ThreadMapUtil.remove("abTransactionManagerExist");
ThreadMapUtil.remove("abTransactionManagerRollbackOnly");
ThreadMapUtil.remove();
logger.info("分布式事务释放:"+(i++));
}
}
事务管理器的方法调用顺序和时机大概说一下:
1 doGetTransaction方法:来初始化事务处理过程中的公共资源,后面调用的其他方法都是以它为媒介的。
2 doBegin方法:开始事务操作,主要是打开数据源的链接,记得要放到事务资源管理服务中TransactionSynchronizationManager,非常重要,因为这个过程中用到的jdbc操作是从这里面拿的。
3 doCommit(doRollback):如题,把获取的链接提交或者回滚操作。
4 doCleanupAfterCompletion:回收链接资源
至此,事务管理器的逻辑已经结束了~
最后,实现务必实现isExistingTransaction,用来处理重复线程内多次触发了事务切面的逻辑
这里笔者用简单的线程变量来标记是否线程内已存在了事务管理,因为我只支持PROPAGATION_REQUIRED传播性,所以没考虑内部嵌入的其他情况,其实也是内部commit一下,资源肯定是最后统一释放的。
3 使用自定义事务管理器
先提一下,这里笔者只保护会使用到多数据源的模块,其实大部分系统逻辑还是用DataSourceTransactionManager就够,不需要保护太多数据源(因为释放和打开链接是有性能损耗的)。
可以看出,主要的逻辑系统还是使用传统管理器,然后在特定地方声明特殊管理器则可:
5 到这里,整个分布式事务管理已完成了,主要是利用了路由数据源AbstractRoutingDataSource和自定义事务管理器实现的~
6 AgileBPM的多数据源结合展示(可跳过)
这里展示一下,这个开源的流程系统的强大可配置性(让人发指的灵活性-。-)的数据源管理功能。
数据源模板,在这里你可以使用定义不同的数据源实现类,只要在项目import就行
这里有一个内置的阿里的数据源,后面有需要你可以增加其他模板,例如BasicDataSOurce这个,最常用的数据源。
有了模板,就可以新建数据源了:
这里的是特殊默认数据源,系统本地数据源,用户可以随便添加。
然后,我就基于AgileBPM的强大数据源管理下,进行了分布式数据源测试。测试逻辑很简单,就是在一个线程内,操作多个数据源,然后看一下会不会一起回滚。
这里展示了一下,AgileBPM中使用数据源的便捷性,根据配置的别名,直接拿来代码开发则可,测试代码比较随意了,能保证一致性。
7 这样的实现在压测中的表现
本人用的是jmetter来压测事务处理,它的表现跟传统的DataSourceTransactionManager表现是一样的!!!!(虽然过程遇到了线程变量的坑,但已修复)。
配置,400进程同时施压:
这是压测结果
这是日志输出,我故意输出了每一次获取链接,提交,和释放的事务处理过程
8 挫败:原来行业内的问题主要卡在commit上。
由于对分布式事务产生极大兴趣,所以专研了一下,这么简单的实现为啥别人都觉得是打问题呢?原来是因为commit会出错的情况,第一个链接commit成功后,第二个链接commit失败,那么第一个链接已不能回滚了!!!!所以行业内大部分方案都在处理这种情况,虽然到了commit阶段,数据库已经对相关资源产生了写锁,数据也写入磁盘,就等commit刷进去了,产生错误的概率是极少了。作为行业内的大难题,很多方案在处理这个问题。什么2pc原则。。等等,有空我整理一下。大部分主流项目解决方案还是tcc为主,毕竟这个最通用直接。
8.1顿悟!!
顿悟!!其实我以上这种实现方案就是2pc的实现方案,在jdbc都操作了sql没问题后,再一并提交的方案就是2pc。但是2pc有这个commit提交存在的设计缺陷(这种时机是存在很少可能性的),所以别人就提出tcc和消息队列的解决commit异常的更可靠的方案(但是,只要是串行逻辑就没有百分百可靠的方案,只是降低了可能性罢了)。所以,ab项目关于分布式是采用了2pc的解决方案,顺带提一下jta事务也是类似的逻辑,不过他们的性能主要卡在消息通知上。例如所有链接操作sql都成功了,我需要通知AB链接去提交,我通知了A,A提交成功,然后我通知B,B没收到消息,那么AB资源都会卡住不释放,然后B会超时导致回滚了。所以,jta在消息通知上比较损耗性能……关于2pc的友情链接:
落寞的流月城(632266504) 14:13:42 https://cloud.tencent.com/developer/article/1355859
9关于AB项目的多数据bo场景方案
在经历挫败之后,理性分析了一下,其实当前这种方案已经满足了ab的分布式事务处理的需求了。首先,其实commit失败的场景是少之又少,笔者调整了逻辑,后面把重要的系统数据源放在最后提交,保证了系统数据源的强一致性,也就是说保证了流程数据的一致性。
原因分析,这里细想一个场景,我先把业务数据从1改成2,然后驱动流程流转,假如我的bo是其他数据源A,A先提交成功,但是系统的本地数据源B提交失败了,那么导致B操作的流程数据会回滚,但是A的数据已提交无法回滚。结果是,流程没有流转,但是业务数据已更新了为2了。这种场景在ab中,相当于,我操作了一下业务数据的保存,因为流程没有变,只是保存了一下数据,对于流程系统本身来说,有时候还是好事,因为虽然流程流转失败了,但是业务数据不想再填写一次。所以我说这种方案已经满足ab项目的多数据源下的分布式场景的需求了。
当然,如果用户还是执着于所有数据源的强一致性,在ab项目中可以在bo保存前,先备份一下bo数据,然后在doCommit时恢复备份数据则可以,ab里有很多时机插件,定义了一些时机插件列表,然后你多实现了插件则会运行,ab的插件代码展示,如下:
ab作为面向技术人员的流程系统,里面内嵌提供了丰富的便捷开发的写法和实现。
|