前言
我们使用 SpringBoot + Druid 完成了单机多数据源的切换,但是这时会出现一个问题,添加事务了以后,数据源切换出现了问题,那么应该怎么解决呢?(实际是解决数据一致性的问题)
抛出异常回滚时,子事务已经提交,无法回滚,会产生数据不一致的问题。
单机
实现事务内切换数据源(支持原生Spring声明式事务哟,仅此一家),并支持多数据源事务回滚(有了 它除了跨服务的事务你需要考虑分布式事务,其他都不需要,极大的减少了系统的复杂程度)
这里采用的方案是:
- 扩展
Spring Jdbc提供的抽象类AbstractRoutingDataSource,实现切换数据源 - 数据源配置主要有两种
- 基于配置文件(该文章基于这种实现,但支持动态添加)
- 基于数据库表
- 基于
AspectJ实现动态数据源切换,支持方法级、类级,优先方法级别 - 通过实现
ibatis.Transaction和TransactionFactory支持原生Spring的@Transaction的多数据源事务回滚
环境准备
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<druid.version>1.2.8</druid.version>
<mybatis-spring-boot.version>2.2.2</mybatis-spring-boot.version>
数据源切换
思路
扩展 Spring Jdbc 提供的抽象类 AbstractRoutingDataSource,实现切换数据源
- targetDataSources是目标数据源集合
- defaultTargetDataSource是默认数据源
- resolvedDataSources是解析后的数据源集合
- resolvedDefaultDataSource是解析后的默认数据源
- determineCurrentLookupKey 为抽象方法,通过扩展这个方法来实现数据源的切换,key是数据源的名称。
lookup key通常是绑定在线程上下文中,根据这个key去resolvedDataSources中取出DataSource
通过注解 + AOP + ThreadLocal 拦截方法后,添加数据源KEY到中,然后查询数据库时会获取到这个key,进而获取对应的数据源进行操作。
实现代码
- 首先我们需要使用一个注解作为AOP切面的拦截标识
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface DS {
String value() default DynamicDataSourceProperties.PRIMARY;
}
- 基于线程变量
ThreadLocal提供一个数据源切换的工具,采用 双端 队列存储,主要是为了支持嵌套数据源切换。
@Slf4j
public final class DataSourceContentHolder {
private static final ThreadLocal<Deque<String>> contentHolder = new NamedThreadLocal("dynamic-datasource") {
@Override
protected Object initialValue() {
// return new LinkedList<>();
return new ArrayDeque<>();
}
};
private DataSourceContentHolder() {
}
/**
* 给当前线程添加数据源
*
* @param dataSource
*/
public static void add(String dataSource) {
log.debug("添加数据源 => {}", dataSource);
contentHolder.get().add(dataSource);
}
/**
* 获取当前线程数据源
*
* @return
*/
public static String get() {
String ds = contentHolder.get().peek();
ds = StringUtils.isNotBlank(ds) ? ds : DynamicDataSourceProperties.PRIMARY;
log.debug("获取当前数据源 => {}", ds);
return ds;
}
/**
* 清空当前数据源
* 如果当前数据源不为空,则会只移除队列的元素
*/
public static void poll() {
Deque<String> queue = contentHolder.get();
String ds = queue.poll();
log.debug("移除数据源 => {}", ds);
if (queue.isEmpty()) {
contentHolder.remove();
}
}
/**
* 嵌套执行方法
*
* @param dataSource
* @param callback
*/
public static void call(String dataSource, Runnable callback) {
try {
add(dataSource);
callback.run();
} finally {
poll();
}
}
}
- 定义一个接口类并定义动态数据源的基本操作
/**
* 动态数据源配置
*/
public interface IDynamicDataSource {
/**
* 是否包含数据源
*
* @param key 数据源KEY
* @return
*/
boolean containKey(Object key);
/**
* 添加数据源
*
* @param key 数据源KEY
* @param dataSourceProperties 数据源配置
*/
void add(Object key, DataSourceProperties dataSourceProperties);
/**
* 添加数据源
*
* @param key 数据源KEY
* @param dataSource 数据源
*/
void add(Object key, DataSource dataSource);
/**
* 覆盖数据源Maps
*
* @param objectObjectMap
*/
void setMap(Map<Object, Object> objectObjectMap);
/**
* 删除数据源
*
* @param key
*/
void del(Object key);
/**
* 替换数据源
* <p>
* 如果数据源不存在则添加,存在则替换
* </p>
*
* @param key 被替换的数据源KEY
* @param dataSource 数据源
*/
void replace(Object key, DataSource dataSource);
/**
* 替换数据源
* <p>
* 如果数据源不存在则添加,存在则替换
* </p>
*
* @param key 被替换的数据源KEY
* @param dataSourceProperties 数据源配置
*/
void replace(Object key, DataSourceProperties dataSourceProperties);
/**
* 获取当前数据源
*
* @return 数据源
*/
Object get();
/**
* 通过数据源KEY,获取数据源
*
* @param key 数据源,不存在则为NULL
* @return
*/
Object get(String key);
/**
* 获取当前数据源的Key
*
* @return 数据源Key
*/
String getKey();
}
- 继承
Spring的AbstractRoutingDataSource并实现IDynamicDataSource接口完成数据源的管理
/**
* 实现多数数据源控制
*/
public class DynamicDataSource extends AbstractRoutingDataSource implements IDynamicDataSource {
private static volatile DynamicDataSource INSTANCE;
private static Map<Object, Object> dataSourceMap = new ConcurrentHashMap<>();
private static final ReentrantLock lock = new ReentrantLock();
public static DynamicDataSource getInstance() {
if (INSTANCE == null) {
synchronized (DynamicDataSource.class) {
if (INSTANCE == null) {
INSTANCE = new DynamicDataSource();
}
}
}
return INSTANCE;
}
public boolean containKey(Object key) {
boolean b = dataSourceMap.containsKey(key);
return b;
}
public void add(Object key, DataSourceProperties dataSourceProperties) {
DataSource dataSource = DataSourceFactory.createDataSource(dataSourceProperties);
add(key, dataSource);
}
public void add(Object key, DataSource dataSource) {
boolean hasKey = dataSourceMap.containsKey(key);
if (hasKey) {
// if (o instanceof DataSource) {
// DruidDataSource druidDataSource = (DruidDataSource) o;
// druidDataSource.close();
// }
throw new SXException("数据库连接池 KEY 重复");
}
dataSourceMap.put(key, dataSource);
setMap(dataSourceMap);
}
public void setMap(Map<Object, Object> objectObjectMap) {
lock.lock();
this.dataSourceMap = objectObjectMap;
setPrimary();
super.setTargetDataSources(dataSourceMap);
super.afterPropertiesSet();
lock.unlock();
}
/**
* 设置主数据源
*/
private void setPrimary() {
Object o = this.dataSourceMap.get(DynamicDataSourceProperties.PRIMARY);
if (o != null) {
this.setDefaultTargetDataSource(o);
}
}
public void del(Object key) {
Object o = dataSourceMap.get(key);
if (o != null) {
if (o instanceof DataSource) {
DruidDataSource dataSource = (DruidDataSource) o;
if (dataSource != null) {
dataSource.close();
dataSourceMap.remove(key);
setMap(dataSourceMap);
}
} else {
throw new SXException("数据池类型无法识别");
}
}
}
/**
* @param key 被替换的数据源KEY
* @param dataSource 数据源
*/
public void replace(Object key, DataSource dataSource) {
Object o = dataSourceMap.get(key);
dataSourceMap.put(key, dataSource);
if (DynamicDataSourceProperties.PRIMARY.equals(key)) {
this.setDefaultTargetDataSource(dataSource);
}
if (o != null) {
if (o instanceof DataSource) {
DruidDataSource dataSource1 = (DruidDataSource) o;
if (dataSource1 != null) {
dataSource1.close();
}
}
}
setMap(dataSourceMap);
}
/**
* @param key 被替换的数据源KEY
* @param dataSourceProperties 数据源配置
*/
public void replace(Object key, DataSourceProperties dataSourceProperties) {
DruidDataSource dataSource = DataSourceFactory.createDataSource(dataSourceProperties);
replace(key, dataSource);
}
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContentHolder.get();
}
@Override
public Object get() {
return get(getKey());
}
@Override
public Object get(String key) {
return dataSourceMap.get(key);
}
@Override
public String getKey() {
return DataSourceContentHolder.get();
}
}
- 创建AOP切面,实施数据源切换拦截
@Aspect
@Component
@Slf4j
@Order(-1)
public class DataSourceAspect {
@Pointcut("@within(top.zsmile.common.datasource.annotation.DS) || @annotation(top.zsmile.common.datasource.annotation.DS)")
public void dataSourceAspect() {
}
@Before("dataSourceAspect()")
public void beforeSwitch(JoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
DS methodDS = method.getAnnotation(DS.class);
String value = null;
if (methodDS != null) {
value = methodDS.value();
} else {
Class<?> aClass = joinPoint.getTarget().getClass();
DS annotation = aClass.getAnnotation(DS.class);
if (annotation != null) {
value = annotation.value();
}
}
if (checkValue(value)) {
DataSourceContentHolder.add(value);
}
}
/**
* 检查数据源是否存在
*
* @param value
* @return
*/
private boolean checkValue(final String value) {
if (!StringUtils.isEmpty(value)) {
boolean containKey = DynamicDataSource.getInstance().containKey(value);
if (!containKey) {
throw new SXException("数据源" + value + "未准备");
}
}
return true;
}
@After("dataSourceAspect()")
public void afterSwitchDS() {
DataSourceContentHolder.poll();
}
}
- 创建
Druid公共配置文件 (DruidProperties)、 动态数据源配置文件(DynamicProperties)、数据源配置文件(DataSourceProperties)
@Data
//@ConfigurationProperties(prefix = "spring.datasource.druid")
public class DataSourceProperties {
private String driverClassName;
private String url;
private String username;
private String password;
/**
* Druid默认参数
*/
private int initialSize;
private int maxActive;
private int minIdle;
private long maxWait;
private long timeBetweenEvictionRunsMillis;
private long minEvictableIdleTimeMillis;
private long maxEvictableIdleTimeMillis;
private String validationQuery;
private int validationQueryTimeout;
private Boolean testOnBorrow;
private Boolean testOnReturn;
private Boolean testWhileIdle;
private Boolean poolPreparedStatements;
private int maxOpenPreparedStatements;
private Boolean sharePreparedStatements;
private String filters;
private String connectionProperties;
}
@Data
public class DruidProperties {
/**
* Druid默认参数
*/
private int initialSize = 5;
private int maxActive = 20;
private int minIdle = 5;
private long maxWait = 60 * 1000L;
private long timeBetweenEvictionRunsMillis = 60 * 1000L;
private long minEvictableIdleTimeMillis = 1000L * 60L * 30L;
private long maxEvictableIdleTimeMillis = 1000L * 60L * 60L * 7;
private String validationQuery = "select 1 from DUAL";
private int validationQueryTimeout = -1;
private Boolean testOnBorrow = false;
private Boolean testOnReturn = false;
private Boolean testWhileIdle = true;
private Boolean poolPreparedStatements = true;
private int maxOpenPreparedStatements = -1;
private Boolean sharePreparedStatements = false;
private String filters = "stat,wall";
private String connectionProperties;
}
@Slf4j
@Getter
@Setter
@ConfigurationProperties(prefix = DynamicDataSourceProperties.PREFIX)
public class DynamicDataSourceProperties {
public static final String PREFIX = "spring.datasource.dynamic";
/**
* 默认数据源,master
*/
public static final String PRIMARY = "master";
/**
* 所有数据源配置
*/
private Map<String, DataSourceProperties> datasource = new LinkedHashMap<>();
/**
* Druid配置
*/
private DruidProperties druid = new DruidProperties();
}
- 数据源配置
@Configuration
@EnableConfigurationProperties(DynamicDataSourceProperties.class)
public class DynamicDataSourceConfig {
@Resource
private DynamicDataSourceProperties dynamicDataSourceProperties;
/**
* 动态数据源
*
* @return
*/
@ConditionalOnMissingBean
@Bean(name = "dynamicDataSource")
public DynamicDataSource dynamicDataSource() {
DynamicDataSource dynamicDataSource = DynamicDataSource.getInstance();
Map<Object, Object> dataSourceMap = getDynamicDataSource();
dynamicDataSource.setMap(dataSourceMap);
return dynamicDataSource;
}
/**
* mybatis-spring start config sql-session-factory
*
* @param dataSource
* @return
*/
@Bean
public SqlSessionFactoryBeanCustomizer sqlSessionFactoryBeanCustomizer(@Qualifier("dynamicDataSource") DynamicDataSource dataSource) {
return new SqlSessionFactoryBeanCustomizer() {
@Override
public void customize(SqlSessionFactoryBean factoryBean) {
// 动态切换数据源事务必须要添加的。
//factoryBean.setTransactionFactory(new DynamicTransactionFactory());
factoryBean.setDataSource(dataSource);
}
};
}
/**
* 事务管理器
*
* @param dynamicDataSource 动态数据源
* @return
*/
@ConditionalOnMissingBean
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager(@Qualifier("dynamicDataSource") DynamicDataSource dynamicDataSource) {
return new DataSourceTransactionManager(dynamicDataSource);
}
/**
* 遍历数据源配置并加载
*
* @return
*/
private Map<Object, Object> getDynamicDataSource() {
DruidProperties druid = dynamicDataSourceProperties.getDruid();
Map<String, DataSourceProperties> dataSourcePropertiesMap = dynamicDataSourceProperties.getDatasource();
Map<Object, Object> dataSourceMap = new HashMap<>(dataSourcePropertiesMap.size());
dataSourcePropertiesMap.forEach((k, v) -> {
DataSourceProperties mergeProperties = DynamicDataSourceUtils.merge(v, druid);
DruidDataSource dataSource = DataSourceFactory.createDataSource(mergeProperties);
dataSourceMap.put(k, dataSource);
});
return dataSourceMap;
}
}
到目前为止,已经可以支持多数据源的切换,但是会有一个问题,那就是 添加了 Spring声明式事务注解@Transactional后就没有办法切换数据源了。
其实市面上比较成熟的Mybatis Plus提供的 多数据源也会存在这个问题,查看源代码
SpringManagedTransaction其实就可以知道原因,因为Spring 开启事务时会调getConnection()方法,其内部会缓存数据库连接。
那么问题是不是出在了数据库连接上,我们更换一下数据源的 Connection获取
多库事务问题
临时方案
如何解决切库事务问题?借助Spring的声明式事务处理,我们可以在多次切库操作时强制开启新的事务:
@DS("slave")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
但是处理当我们在主事务中抛出异常回滚的时候,子事务已经提交了,无法回滚,这时就产生数据错乱了。
实现代码
- 先在
DynamicDataSource重写getConnection方法
/**
* 处理多数据源的事务的关键
* @return
* @throws SQLException
*/
@Override
public Connection getConnection() throws SQLException {
DataSource dataSource =
super.getResolvedDataSources().get(getKey());
return dataSource.getConnection();
}
- 实现
Transaction。注意这里的Transaction是org.apache.ibatis.transaction.Transaction
@Slf4j
public class DynamicTransaction implements Transaction {
/**
* 数据源
*/
private final DataSource dataSource;
/**
* 主数据源连接
*/
private Connection connection;
/**
* 是否开启事务
*/
private Boolean isConnectionTransactional;
/**
* 是否自动提交
*/
private Boolean autoCommit;
/**
* 连接标识
*/
private String identification;
/**
* 其他连接缓存
*/
private ConcurrentMap<String, Connection> connections;
/**
* 构造器
*
* @param dataSource 数据源
*/
public DynamicTransaction(DataSource dataSource) {
Assert.notNull(dataSource, "No DataSource specified");
// 当前数据源Key
this.identification = DataSourceContentHolder.get();
this.dataSource = dataSource;
connections = new ConcurrentHashMap<>();
log.debug("init dynamic transaction, identify key => {},address => {}", this.identification, this);
}
/**
* @return
* @throws SQLException
*/
@Override
public Connection getConnection() throws SQLException {
/* 获取当前生效的数据源标识 */
String current = DataSourceContentHolder.get();
log.debug("current key => {}, identify key=> {}", current, this.identification);
// 如果当前数据源是主数据源
if (current.equals(this.identification)) {
// 如果为空则创建连接
if (this.connection == null) {
openConnection();
}
return this.connection;
} else {
/* 不是默认数据源,获取连接并设置属性 */
if (!connections.containsKey(current)) {
// 如果连接不包含该数据源KEY获取的连接
try {
Connection conn = this.dataSource.getConnection();
/* 自动提交属性和主数据源保持连接 */
conn.setAutoCommit(this.autoCommit);
connections.put(current, conn);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("could not get jdbc connection", ex);
}
}
return connections.get(current);
}
}
/**
* 打开连接
*
* @throws SQLException
*/
private void openConnection() throws SQLException {
// 获取连接
this.connection = DataSourceUtils.getConnection(this.dataSource);
// 是否自动提交
this.autoCommit = this.getConnection().getAutoCommit();
// 确定当前连接是否是事务性的。
// 即通过 Spring 的事务管理器绑定到当前线程的
this.isConnectionTransactional =
DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
log.debug("jdbc connection [{}] will {} be managed by spring", this.connection, (this.isConnectionTransactional ? "" : "not"));
}
/**
* 提交事务
*
* @throws SQLException
*/
@Override
public void commit() throws SQLException {
// 如果主事务不为空 && 如果是 Spring 管理的事务性连接 && 不是自动提交
if (this.connection != null && this.isConnectionTransactional &&
!this.autoCommit) {
// 提交主连接的事务
log.debug("committing jdbc connection [{}]", this.connection);
this.connection.commit();
// 遍历提交子连接的事务
for (Connection conn : connections.values()) {
conn.commit();
}
}
}
/**
* 回滚事务
*
* @throws SQLException
*/
@Override
public void rollback() throws SQLException {
if (this.connection != null && this.isConnectionTransactional &&
!this.autoCommit) {
log.debug("rolling back jdbc connection [{}]", this.connection);
this.connection.rollback();
for (Connection conn : connections.values()) {
conn.rollback();
}
}
}
/**
* 关闭并释放连接
*
* @throws SQLException
*/
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.connection, this.dataSource);
for (Connection conn : connections.values()) {
DataSourceUtils.releaseConnection(conn, this.dataSource);
}
}
/**
* 获取事务超时时间
*
* @return
* @throws SQLException
*/
@Override
public Integer getTimeout() throws SQLException {
ConnectionHolder holder = (ConnectionHolder)
TransactionSynchronizationManager.getResource(dataSource);
if (holder != null && holder.hasTimeout()) {
return holder.getTimeToLiveInSeconds();
}
return null;
}
}
- 继承
SpringManagedTransactionFactory实现事务工厂DynamicTransactionFactory
public class DynamicTransactionFactory extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new DynamicTransaction(dataSource);
}
}
- 将
DynamicTransactionFactory注入Mybaits的SqlSessionFactory
/**
* mybatis-spring start config sql-session-factory
*
* @param dataSource
* @return
*/
@Bean
public SqlSessionFactoryBeanCustomizer sqlSessionFactoryBeanCustomizer(@Qualifier("dynamicDataSource") DynamicDataSource dataSource) {
return new SqlSessionFactoryBeanCustomizer() {
@Override
public void customize(SqlSessionFactoryBean factoryBean) {
factoryBean.setTransactionFactory(new DynamicTransactionFactory()); // 添加到这里额
factoryBean.setDataSource(dataSource);
}
};
}
分布式事务
Atomikos
seata
引用
https://blog.csdn.net/qq_35789269/article/details/128125061
https://juejin.cn/post/7103913605539561509
https://blog.csdn.net/m0_73533108/article/details/126688445
https://blog.csdn.net/demohui/article/details/109659540?ops_request_misc=&request_id=&biz_id=102&utm_term=spring%20%E4%BA%8B%E5%8A%A1%E5%86%85%E5%88%87%E6%8D%A2%E6%95%B0%E6%8D%AE%E6%BA%90&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-0-109659540.142^v88^control,239^v2^insert_chatgpt