add multi datasource support

This commit is contained in:
开源海哥 2023-03-31 16:42:40 +08:00
parent e89e8f3ef0
commit 928f2d1e49
8 changed files with 247 additions and 62 deletions

View File

@ -15,7 +15,7 @@
*/
package com.mybatisflex.core;
import com.mybatisflex.core.datasource.RoutingDataSource;
import com.mybatisflex.core.datasource.FlexDataSource;
import com.mybatisflex.core.mybatis.FlexConfiguration;
import com.mybatisflex.core.mybatis.FlexSqlSessionFactoryBuilder;
import org.apache.ibatis.logging.Log;
@ -36,7 +36,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* MybatisFlex 的启动类
@ -56,10 +55,10 @@ public class MybatisFlexBootstrap {
protected final AtomicBoolean started = new AtomicBoolean(false);
protected String environmentId;
protected String environmentId = FlexConsts.NAME;
protected TransactionFactory transactionFactory;
protected DataSource dataSource;
protected FlexDataSource dataSource;
protected Configuration configuration;
protected List<Class<?>> mappers;
@ -67,14 +66,13 @@ public class MybatisFlexBootstrap {
protected Class<? extends Log> logImpl;
private Map<Class<?>, Object> mapperObjects = new ConcurrentHashMap<>();
private ThreadLocal<SqlSession> sessionThreadLocal = new ThreadLocal<>();
/**
* 虽然提供了 getInstance但也允许用户进行实例化
* 用于创建多个 MybatisFlexBootstrap 实例达到管理多数据源的目的
*/
public MybatisFlexBootstrap(String environmentId) {
this.environmentId = environmentId;
public MybatisFlexBootstrap() {
}
private static volatile MybatisFlexBootstrap instance;
@ -83,7 +81,7 @@ public class MybatisFlexBootstrap {
if (instance == null) {
synchronized (MybatisFlexBootstrap.class) {
if (instance == null) {
instance = new MybatisFlexBootstrap(FlexConsts.NAME);
instance = new MybatisFlexBootstrap();
}
}
}
@ -124,7 +122,6 @@ public class MybatisFlexBootstrap {
//init sqlSessionFactory
this.sqlSessionFactory = new FlexSqlSessionFactoryBuilder().build(configuration);
//init mappers
if (mappers != null) {
mappers.forEach(configuration::addMapper);
@ -147,10 +144,6 @@ public class MybatisFlexBootstrap {
protected SqlSession openSession() {
SqlSession sqlSession = sessionThreadLocal.get();
if (sqlSession != null) {
return sqlSession;
}
return sqlSessionFactory.openSession(configuration.getDefaultExecutorType(), true);
}
@ -175,34 +168,6 @@ public class MybatisFlexBootstrap {
}
/**
* 执行事务操作不支持嵌套事务
*
* @param supplier
* @return false 回滚事务true 正常执行
*/
public boolean tx(Supplier<Boolean> supplier) {
SqlSession sqlSession = sqlSessionFactory.openSession(configuration.getDefaultExecutorType());
boolean success = false;
boolean rollback = true;
try {
sessionThreadLocal.set(sqlSession);
success = supplier.get();
} catch (Throwable e) {
rollback = false;
sqlSession.rollback();
} finally {
sessionThreadLocal.remove();
if (!success && rollback) {
sqlSession.rollback();
} else if (success) {
sqlSession.commit();
}
}
return success;
}
public String getEnvironmentId() {
return environmentId;
}
@ -226,18 +191,15 @@ public class MybatisFlexBootstrap {
}
public MybatisFlexBootstrap setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
this.dataSource = new FlexDataSource(FlexConsts.NAME, dataSource);
return this;
}
public MybatisFlexBootstrap addDataSource(String dataSourceKey, DataSource dataSource) {
if (this.dataSource == null) {
this.dataSource = new RoutingDataSource(dataSourceKey, dataSource);
} else if (this.dataSource instanceof RoutingDataSource) {
((RoutingDataSource) this.dataSource).addDataSource(dataSourceKey, dataSource);
this.dataSource = new FlexDataSource(dataSourceKey, dataSource);
} else {
this.dataSource = new RoutingDataSource("default", this.dataSource);
((RoutingDataSource) this.dataSource).addDataSource(dataSourceKey, dataSource);
this.dataSource.addDataSource(dataSourceKey, dataSource);
}
return this;
}

View File

@ -17,6 +17,8 @@ package com.mybatisflex.core.datasource;
import com.mybatisflex.core.dialect.DbType;
import com.mybatisflex.core.dialect.DbTypeUtil;
import com.mybatisflex.core.transaction.TransactionContext;
import com.mybatisflex.core.transaction.TransactionalManager;
import com.mybatisflex.core.util.StringUtil;
import javax.sql.DataSource;
@ -25,13 +27,16 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class RoutingDataSource extends AbstractDataSource {
public class FlexDataSource extends AbstractDataSource {
private final Map<String, DataSource> dataSourceMap = new HashMap<>();
private final Map<String, DbType> dbTypeHashMap = new HashMap<>();
private final String defaultDataSourceKey;
private final DataSource defaultDataSource;
public RoutingDataSource(String dataSourceKey, DataSource dataSource) {
public FlexDataSource(String dataSourceKey, DataSource dataSource) {
this.defaultDataSourceKey = dataSourceKey;
this.defaultDataSource = dataSource;
dataSourceMap.put(dataSourceKey, dataSource);
dbTypeHashMap.put(dataSourceKey, DbTypeUtil.getDbType(dataSource));
@ -42,20 +47,55 @@ public class RoutingDataSource extends AbstractDataSource {
dbTypeHashMap.put(dataSourceKey, DbTypeUtil.getDbType(dataSource));
}
public DbType getDbType(String dataSourceKey){
public DbType getDbType(String dataSourceKey) {
return dbTypeHashMap.get(dataSourceKey);
}
@Override
public Connection getConnection() throws SQLException {
return getDataSource().getConnection();
String xid = TransactionContext.getXID();
if (StringUtil.isNotBlank(xid)) {
String dataSourceKey = DataSourceKey.get();
if (StringUtil.isBlank(dataSourceKey)) {
dataSourceKey = defaultDataSourceKey;
}
Connection connection = TransactionalManager.getConnection(xid, dataSourceKey);
if (connection != null) {
return connection;
} else {
connection = getDataSource().getConnection();
TransactionalManager.hold(xid, dataSourceKey, connection);
return connection;
}
} else {
return getDataSource().getConnection();
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return getDataSource().getConnection(username, password);
String xid = TransactionContext.getXID();
if (StringUtil.isNotBlank(xid)) {
String dataSourceKey = DataSourceKey.get();
if (StringUtil.isBlank(dataSourceKey)) {
dataSourceKey = defaultDataSourceKey;
}
Connection connection = TransactionalManager.getConnection(xid, dataSourceKey);
if (connection != null) {
return connection;
} else {
connection = getDataSource().getConnection(username, password);
TransactionalManager.hold(xid, dataSourceKey, connection);
return connection;
}
} else {
return getDataSource().getConnection(username, password);
}
}
@Override
@SuppressWarnings("unchecked")
public <T> T unwrap(Class<T> iface) throws SQLException {
@ -70,6 +110,7 @@ public class RoutingDataSource extends AbstractDataSource {
return (iface.isInstance(this) || getDataSource().isWrapperFor(iface));
}
private DataSource getDataSource() {
DataSource dataSource = defaultDataSource;
if (dataSourceMap.size() > 1) {

View File

@ -18,7 +18,7 @@ package com.mybatisflex.core.mybatis;
import com.mybatisflex.annotation.UseDataSource;
import com.mybatisflex.core.FlexGlobalConfig;
import com.mybatisflex.core.datasource.DataSourceKey;
import com.mybatisflex.core.datasource.RoutingDataSource;
import com.mybatisflex.core.datasource.FlexDataSource;
import com.mybatisflex.core.dialect.DbType;
import com.mybatisflex.core.dialect.DialectFactory;
import com.mybatisflex.core.row.RowMapper;
@ -28,7 +28,6 @@ import com.mybatisflex.core.util.StringUtil;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.util.MapUtil;
import javax.sql.DataSource;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Map;
@ -39,11 +38,11 @@ public class MapperInvocationHandler implements InvocationHandler {
private static final Map<Method, String> methodDataSourceKeyMap = new ConcurrentHashMap<>();
private final Object mapper;
private final DataSource dataSource;
private final FlexDataSource dataSource;
public MapperInvocationHandler(Object mapper, Configuration configuration) {
this.mapper = mapper;
this.dataSource = configuration.getEnvironment().getDataSource();
this.dataSource = (FlexDataSource) configuration.getEnvironment().getDataSource();
}
@ -67,8 +66,8 @@ public class MapperInvocationHandler implements InvocationHandler {
//优先获取用户自己配置的 dbType
DbType dbType = DialectFactory.getHintDbType();
if (dbType == null) {
if (dataSourceKey != null && dataSource instanceof RoutingDataSource) {
dbType = ((RoutingDataSource) dataSource).getDbType(dataSourceKey);
if (dataSourceKey != null) {
dbType = dataSource.getDbType(dataSourceKey);
}
if (dbType == null) {
dbType = FlexGlobalConfig.getDefaultConfig().getDbType();

View File

@ -18,13 +18,17 @@ package com.mybatisflex.core.row;
import com.mybatisflex.core.FlexGlobalConfig;
import com.mybatisflex.core.paginate.Page;
import com.mybatisflex.core.query.QueryWrapper;
import com.mybatisflex.core.transaction.TransactionContext;
import com.mybatisflex.core.transaction.TransactionalManager;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.util.MapUtil;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* 针对 RowMapper 的静态方法进行封装
@ -387,4 +391,36 @@ public class Db {
public static Page<Row> paginate(String tableName, Page<Row> page, QueryWrapper queryWrapper) {
return invoker().paginate(tableName, page, queryWrapper);
}
public static boolean tx(Supplier<Boolean> supplier) {
//上一级事务的id支持事务嵌套
String prevXID = TransactionContext.getXID();
try {
String xid = UUID.randomUUID().toString();
TransactionContext.hold(xid);
boolean success = false;
boolean rollbacked = false;
try {
success = supplier.get();
} catch (Exception e) {
rollbacked = true;
TransactionalManager.rollback(xid);
e.printStackTrace();
} finally {
if (success) {
TransactionalManager.commit(xid);
} else if (!rollbacked) {
TransactionalManager.rollback(xid);
}
TransactionContext.release();
}
return success;
} finally {
//恢复上一级事务
if (prevXID != null) {
TransactionContext.hold(prevXID);
}
}
}
}

View File

@ -0,0 +1,36 @@
/**
* Copyright (c) 2022-2023, Mybatis-Flex (fuhai999@gmail.com).
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mybatisflex.core.transaction;
public class TransactionContext {
private static final ThreadLocal<String> XID_HOLDER = new ThreadLocal<>();
public static String getXID() {
return XID_HOLDER.get();
}
public static void release() {
XID_HOLDER.remove();
}
public static void hold(String xid) {
XID_HOLDER.set(xid);
}
}

View File

@ -0,0 +1,109 @@
/**
* Copyright (c) 2022-2023, Mybatis-Flex (fuhai999@gmail.com).
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mybatisflex.core.transaction;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 事务管理器
*/
public class TransactionalManager {
private static final Log log = LogFactory.getLog(TransactionalManager.class);
//<xid : <datasource : connection>>
private static final ThreadLocal<Map<String, Map<String, Connection>>> CONNECTION_HOLDER
= ThreadLocal.withInitial(ConcurrentHashMap::new);
public static void hold(String xid, String ds, Connection connection) {
Map<String, Map<String, Connection>> holdMap = CONNECTION_HOLDER.get();
Map<String, Connection> connMap = holdMap.get(xid);
if (connMap == null) {
connMap = new ConcurrentHashMap<>();
holdMap.put(xid, connMap);
}
if (connMap.containsKey(ds)) {
return;
}
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Error set AutoCommit to false. Cause: " + e);
}
}
}
public static Connection getConnection(String xid, String ds) {
Map<String, Connection> connections = CONNECTION_HOLDER.get().get(xid);
return connections == null || connections.isEmpty() ? null : connections.get(ds);
}
public static void commit(String xid) {
release(xid, true);
}
public static void rollback(String xid) {
release(xid, false);
}
private static void release(String xid, boolean commit) {
Exception exception = null;
Map<String, Map<String, Connection>> holdMap = CONNECTION_HOLDER.get();
try {
if (holdMap.isEmpty()) {
return;
}
Map<String, Connection> connections = holdMap.get(xid);
for (Connection conn : connections.values()) {
try {
if (commit) {
conn.commit();
} else {
conn.rollback();
}
} catch (SQLException e) {
exception = e;
} finally {
try {
conn.close();
} catch (SQLException e) {
//ignore
}
}
}
} finally {
holdMap.remove(xid);
if (holdMap.isEmpty()) {
CONNECTION_HOLDER.remove();
}
if (exception != null) {
log.error("TransactionalManager.release() is error. cause: " + exception.getMessage(), exception);
}
}
}
}

View File

@ -16,7 +16,7 @@
package com.mybatisflex.spring.boot;
import com.mybatisflex.core.datasource.DataSourceBuilder;
import com.mybatisflex.core.datasource.RoutingDataSource;
import com.mybatisflex.core.datasource.FlexDataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
@ -52,13 +52,13 @@ public class MultiDataSourceAutoConfiguration {
@ConditionalOnMissingBean
public DataSource dataSource() {
RoutingDataSource routingDataSource = null;
FlexDataSource routingDataSource = null;
if (dataSourceProperties != null && !dataSourceProperties.isEmpty()) {
for (String key : dataSourceProperties.keySet()) {
DataSource dataSource = new DataSourceBuilder(dataSourceProperties.get(key)).build();
if (routingDataSource == null) {
routingDataSource = new RoutingDataSource(key, dataSource);
routingDataSource = new FlexDataSource(key, dataSource);
} else {
routingDataSource.addDataSource(key, dataSource);
}

View File

@ -16,6 +16,7 @@
package com.mybatisflex.spring;
import com.mybatisflex.core.FlexConsts;
import com.mybatisflex.core.datasource.FlexDataSource;
import com.mybatisflex.core.mybatis.FlexConfiguration;
import com.mybatisflex.core.mybatis.FlexSqlSessionFactoryBuilder;
import com.mybatisflex.core.mybatis.FlexXMLConfigBuilder;
@ -582,7 +583,8 @@ public class FlexSqlSessionFactoryBean extends SqlSessionFactoryBean
}
targetConfiguration.setEnvironment(new Environment(this.environment,
this.transactionFactory == null ? new SpringManagedTransactionFactory() : this.transactionFactory, dataSource));
this.transactionFactory == null ? new SpringManagedTransactionFactory() : this.transactionFactory,
dataSource instanceof FlexDataSource ? dataSource : new FlexDataSource(FlexConsts.NAME, dataSource)));
if (this.mapperLocations != null) {
if (this.mapperLocations.length == 0) {