Spring Boot × MyBatis × HikariCPで複数データソースを動的 (透過的) に利用する方法
前置き
以前、静的な方法で複数データソースを利用する方法を記事にしました。
今回は透過的に複数データソースを利用する方法について記述します。
透過的とは、リクエスト時のユーザ情報とか、アクセス時間とか、何らかのパラメータを元にアクセスするデータソースを決定することです。
プレミアムユーザなら特別なデータソースを読み込むとか、簡単なシャーディングを実現したりできます。
ただし、実装が複雑になるため、特別な理由がない限り静的な方法を採用したほうが良いと思います。
実装方針
以前と同様に、データソースはOracleとMySQLとします。今回はDao Classにアノテーションを設定して、 特定のDao Classにアクセスした際にデータソースを決定するコードを実装します。
実装レシピ
- DatasourceとTxManagerをBean登録
- AbstractRoutingDataSourceのOverrideしてBean登録
- SpringManagedTransactionを継承したclassを作成
- SpringManagedTransactionFactoryを継承したclassを作成
- MybatisのSqlSessionFactoryをBean登録
- DatabaseContextHolderを更新するInterceptorを作成
application.yml等については、以前の記事も参照ください。
データソース定義
データソース定義は以前と同じです。DatasourceとTransactionManagerをBeanに登録します。
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING) @Configuration class OracleMasterDatasourceConfig { @Bean(name = [ORACLE_MASTER]) @Primary fun oracleMasterDataSource( @Qualifier(HIKARI_CONFIG) hikariConfig: HikariConfig ): DataSource { return HikariDataSource(hikariConfig) } @Bean(name = [HIKARI_CONFIG]) @ConfigurationProperties(prefix = HIKARI_CONFIGURATION_PROPERTIES) fun oracleHikariConfig(): HikariConfig { return HikariConfig() } @Bean(name = [TX_MANAGER]) @Primary fun oracleMasterTxManager( @Qualifier(ORACLE_MASTER) dataSource: DataSource, transactionProperties: TransactionProperties ): PlatformTransactionManager { val transactionManager = DataSourceTransactionManager(dataSource) transactionProperties.customize(transactionManager) return transactionManager } companion object { private const val HIKARI_CONFIGURATION_PROPERTIES = "datasource.oracle.master.hikari" private const val HIKARI_CONFIG = "oracleMasterHikari" private const val TX_MANAGER = "oracleMasterTxManager" } }
MySQLの定義も同様です。Primaryだけ外します。
AbstractRoutingDataSourceをOverrideしてBean登録
複数データソースをlookupするために、AbstractRoutingDataSourceをoverrideします。
class DynamicRoutingDatasourceResolver : AbstractRoutingDataSource(), InfrastructureProxy { override fun determineCurrentLookupKey(): String = DatabaseContextHolder.getDatabase() // dataSourceからconnectionを取得する override fun getConnection(): Connection = DataSourceUtils.getConnection(determineTargetDataSource()) // 現在のdatasourceを返す // protectedのdetermineTargetDataSourceをDynamicRoutingDatasourceTransactionで読めるように fun getTargetDatasource(): DataSource = determineTargetDataSource() // 透過的なデータソース選択でTransactionを実装する際に必要 override fun getWrappedObject(): Any { return determineTargetDataSource() } }
determineCurrentLookupKeyをoverrideすることで、現在参照しているDatasourceのkeyを返却します。
また、Transaction実装が必要な場合、
org.springframework.core.InfrastructureProxy
を実装する必要があります。(後述)
DatabaseContextHolderの実装はこちら
class DatabaseContextHolder { companion object { @JvmStatic private val contextHolder = ThreadLocal<String>() @JvmStatic fun setDatabase(database: String) { contextHolder.set(database) } @JvmStatic fun getDatabase(): String = contextHolder.get() @JvmStatic fun clearDatabase() = contextHolder.remove() } }
keyはThreadLocalで保持します。
最後にdynamicRoutingDatasourceResolverをBeanに登録します。
@Configuration class DynamicRoutingDatasourceConfiguration { @Bean fun dynamicRoutingDatasourceResolver( @Qualifier(ORACLE_MASTER) oracleDatasource: DataSource, @Qualifier(MYSQL_MASTER) mysqlDatasource: DataSource ): DynamicRoutingDatasourceResolver { val resolver = DynamicRoutingDatasourceResolver() val map = mapOf<Any, Any>(ORACLE_MASTER to oracleDatasource, MYSQL_MASTER to mysqlDatasource) resolver.setTargetDataSources(map) // resolver.setDefaultTargetDataSource(oracleDatasource) return resolver } }
SpringManagedTransactionを継承したclassを作成
import com.example.demo.infra.database.configuration.DynamicRoutingDatasourceResolver import org.mybatis.spring.transaction.SpringManagedTransaction import org.springframework.jdbc.datasource.ConnectionHolder import org.springframework.jdbc.datasource.DataSourceUtils import org.springframework.transaction.support.TransactionSynchronizationManager import java.sql.Connection import javax.sql.DataSource class DynamicRoutingDatasourceTransaction( private val dynamicRoutingDatasourceResolver: DynamicRoutingDatasourceResolver ) : SpringManagedTransaction(dynamicRoutingDatasourceResolver) { // transaction境界内で利用したdatasourceとconnectionを保持する // transaction管理していない場合、mapのサイズは常に1になる private val datasourceConnections = mutableMapOf<DataSource, DataSourceConnection>() override fun getConnection(): Connection { val datasource = dynamicRoutingDatasourceResolver.getTargetDatasource() if (datasourceConnections.containsKey(datasource)) { return datasourceConnections[datasource]?.getConnection() ?: throw NullPointerException() } // 現在のdatasourceのconnectionを返す val connection = dynamicRoutingDatasourceResolver.connection datasourceConnections[datasource] = DataSourceConnection(connection, datasource) return connection } override fun commit() { for (datasourceConnection in datasourceConnections.values) { // transaction制御対象外のものはここでcommit if (!datasourceConnection.isTransactional()) { datasourceConnection.commit() } } } override fun rollback() { for (datasourceConnection in datasourceConnections.values) { // transaction制御対象外のものはここでrollback if (!datasourceConnection.isTransactional()) { datasourceConnection.rollback() } } } override fun close() { datasourceConnections.values.forEach { it.close() } } override fun getTimeout(): Int? { for (datasourceConnection in datasourceConnections.values) { val connectionHolder = datasourceConnection.connectionHolder() if (connectionHolder != null && connectionHolder.hasTimeout()) { return connectionHolder.timeToLiveInSeconds } } return null } private class DataSourceConnection( private val connection: Connection, private val datasource: DataSource ) { internal fun getConnection() = connection internal fun isTransactional(): Boolean { if (connection.autoCommit) { return false } return DataSourceUtils.isConnectionTransactional(connection, datasource) } internal fun commit() { connection.commit() } internal fun rollback() { connection.rollback() } internal fun close() { DataSourceUtils.releaseConnection(connection, datasource) } internal fun connectionHolder(): ConnectionHolder? = TransactionSynchronizationManager.getResource(datasource) as ConnectionHolder? } }
SpringManagedTransactionFactoryを継承したclassを作成
class DynamicRoutingDatasourceTransactionFactory : SpringManagedTransactionFactory() { override fun newTransaction( dataSource: DataSource?, level: TransactionIsolationLevel?, autoCommit: Boolean ): Transaction { if (dataSource is DynamicRoutingDatasourceResolver) { return DynamicRoutingDatasourceTransaction(dataSource) } return super.newTransaction(dataSource, level, autoCommit) } }
DynamicRoutingDatasourceResolverの場合に先程定義したDynamicRoutingTransactionを返します。
MybatisのSqlSessionFactoryをBean登録
@Configuration @MapperScan( basePackages = ["com.example.demo.infra.mapper"], annotationClass = Mapper::class ) class DynamicSqlSessionFactory { @Bean fun sqlSessionFactory( dynamicRoutingDatasourceResolver: DynamicRoutingDatasourceResolver, mybatisProperties: MybatisProperties ): SqlSessionFactory { val sqlSessionFactoryBean = SqlSessionFactoryBean() sqlSessionFactoryBean.setDataSource(dynamicRoutingDatasourceResolver) sqlSessionFactoryBean.setMapperLocations(mybatisProperties.resolveMapperLocations()) // default config val configuration = org.apache.ibatis.session.Configuration() mybatisProperties.configuration?.let { configuration.jdbcTypeForNull = it.jdbcTypeForNull configuration.isCacheEnabled = it.isCacheEnabled configuration.defaultExecutorType = it.defaultExecutorType configuration.defaultFetchSize = it.defaultFetchSize configuration.defaultStatementTimeout = it.defaultStatementTimeout configuration.isMapUnderscoreToCamelCase = true } sqlSessionFactoryBean.setConfiguration(configuration) // transaction factory sqlSessionFactoryBean.setTransactionFactory(DynamicRoutingDatasourceTransactionFactory()) return sqlSessionFactoryBean.`object` ?: throw NullPointerException() } }
datasourceにdynamicRoutingDatasourceResolverを、 transactionFactoryにdynamicRoutingDatasourceTransactionFactoryを設定します。
DatabaseContextHolderを更新するInterceptorを作成
@Aspect @Component class DynamicDatasourceInterceptor { @Around("execution(* com.example.demo.infra.dao..*.*(..)) && @target(datasourceComponent)") fun advice(proceedingJoinPoint: ProceedingJoinPoint, datasourceComponent: DatasourceComponent): Any? { logger.debug("select database: ${datasourceComponent.db}") DatabaseContextHolder.setDatabase(datasourceComponent.db) val result = proceedingJoinPoint.proceed() DatabaseContextHolder.clearDatabase() return result } } @Component @Target(AnnotationTarget.CLASS) annotation class DatasourceComponent( val db: String )
AOPで、annotationを設定したDao classにアクセスした際にDatabaseContextHolderが更新されて利用するデータソースを決定します。
Dao sample
@DatasourceComponent(ORACLE_MASTER) class SampleDaoImpl : AbstractDao(), SampleDao { override fun update() { sqlSession.getMapper(SampleMapper::class.java).update() } override fun find(): List<SampleDto> = sqlSession.getMapper(SampleMapper::class.java).find() }
import org.apache.ibatis.session.SqlSession import org.springframework.beans.factory.annotation.Autowired abstract class AbstractDao { @Autowired protected lateinit var sqlSession: SqlSession }
ここまでで一応完成です。
InfrastructureProxyの実装について
前述したように、Transactionを行う場合はorg.springframework.core.InfrastructureProxy
を実装する必要があります。
これは、例えばSqlException等の例外が発生した際にMybatisから返されるDatasourceが
こちらがoverrideしたlookUpKeyで決定される前のdatasource (今回ではdynamicRoutingDatasourceResolver) のため、例外処理内で実際に使用しているDatasourceを特定できなくなるためです。
そのため、InfrastructureProxyを実装して、現在参照しているDatasourceを返却する必要があります。
流れとしては、例外発生時、MybatisがSqlExceptionをDataAccessExceptionに変換しようとして、 その処理内で現在のDatasource / Connectionを取得するために、最終的に以下にいきます。 https://github.com/spring-projects/spring-framework/blob/master/spring-tx/src/main/java/org/springframework/transaction/support/TransactionSynchronizationUtils.java#L66
ここでInfrastructureProxyを実装していないと、dynamicRoutingDatasourceResolverが返されます。
すると、以下でConnectionHolderがnullで返され、新しいConnectionが生成されてしまいます。
https://github.com/spring-projects/spring-framework/blob/master/spring-jdbc/src/main/java/org/springframework/jdbc/datasource/DataSourceUtils.java#L103
現在保持している本当のconnectionはリリースされてcloseされます。 そして、Springはすでにclose されたconnectionについてrollback等の処理を開始するためConnection is closedと怒られてしまいます。
java.sql.SQLException: Connection is closed at com.zaxxer.hikari.pool.ProxyConnection$ClosedConnection.lambda$getClosedConnection$0(ProxyConnection.java:489) at com.sun.proxy.$Proxy89.isReadOnly(Unknown Source) at com.zaxxer.hikari.pool.HikariProxyConnection.isReadOnly(HikariProxyConnection.java) at org.springframework.jdbc.datasource.DataSourceUtils.resetConnectionAfterTransaction(DataSourceUtils.java:237) at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:376) at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:1007) at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:878) at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:812) at org.springframework.transaction.interceptor.TransactionAspectSupport.completeTransactionAfterThrowing(TransactionAspectSupport.java:551) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:298) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) at com.example.demo.application.usecase.SampleFacade$$EnhancerBySpringCGLIB$$668ad13.update(<generated>) at com.example.demo.web.controller.SampleController.update(SampleController.kt:21) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897) at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)
上記の実装についてはサンプルが少なかったので、本当にこれで正しいかはちょっと不安ですが。。説明的にも不安。
まとめ
Spring Boot × MyBatis × HikariCPでの透過的な複数データソース実装についてまとめました。
どうしても複雑化するので、必要がない限りは静的な実装を利用したほうが良いかなと思います!