すきま風

勉強したことのメモとか

Spring Boot × MyBatis × HikariCPで複数データソースを動的 (透過的) に利用する方法

前置き

以前、静的な方法で複数データソースを利用する方法を記事にしました。 今回は透過的に複数データソースを利用する方法について記述します。 透過的とは、リクエスト時のユーザ情報とか、アクセス時間とか、何らかのパラメータを元にアクセスするデータソースを決定することです。 プレミアムユーザなら特別なデータソースを読み込むとか、簡単なシャーディングを実現したりできます。
ただし、実装が複雑になるため、特別な理由がない限り静的な方法を採用したほうが良いと思います。

実装方針

以前と同様に、データソースはOracleMySQLとします。今回はDao Classにアノテーションを設定して、 特定のDao Classにアクセスした際にデータソースを決定するコードを実装します。

実装レシピ

  • DatasourceとTxManagerをBean登録
  • AbstractRoutingDataSourceのOverrideしてBean登録
  • SpringManagedTransactionを継承したclassを作成
  • SpringManagedTransactionFactoryを継承したclassを作成
  • MybatisのSqlSessionFactoryをBean登録
  • DatabaseContextHolderを更新するInterceptorを作成

application.yml等については、以前の記事も参照ください。

データソース定義

データソース定義は以前と同じです。DatasourceとTransactionManagerをBeanに登録します。

@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@Configuration
class OracleMasterDatasourceConfig {

    @Bean(name = [ORACLE_MASTER])
    @Primary
    fun oracleMasterDataSource(
        @Qualifier(HIKARI_CONFIG) hikariConfig: HikariConfig
    ): DataSource {
        return HikariDataSource(hikariConfig)
    }

    @Bean(name = [HIKARI_CONFIG])
    @ConfigurationProperties(prefix = HIKARI_CONFIGURATION_PROPERTIES)
    fun oracleHikariConfig(): HikariConfig {
        return HikariConfig()
    }

    @Bean(name = [TX_MANAGER])
    @Primary
    fun oracleMasterTxManager(
        @Qualifier(ORACLE_MASTER) dataSource: DataSource,
        transactionProperties: TransactionProperties
    ): PlatformTransactionManager {
        val transactionManager = DataSourceTransactionManager(dataSource)

        transactionProperties.customize(transactionManager)

        return transactionManager
    }

    companion object {
        private const val HIKARI_CONFIGURATION_PROPERTIES = "datasource.oracle.master.hikari"

        private const val HIKARI_CONFIG = "oracleMasterHikari"

        private const val TX_MANAGER = "oracleMasterTxManager"
    }
}

MySQLの定義も同様です。Primaryだけ外します。

AbstractRoutingDataSourceをOverrideしてBean登録

複数データソースをlookupするために、AbstractRoutingDataSourceをoverrideします。

class DynamicRoutingDatasourceResolver : AbstractRoutingDataSource(), InfrastructureProxy {
    override fun determineCurrentLookupKey(): String =
        DatabaseContextHolder.getDatabase()

    // dataSourceからconnectionを取得する
    override fun getConnection(): Connection = DataSourceUtils.getConnection(determineTargetDataSource())

    // 現在のdatasourceを返す
    // protectedのdetermineTargetDataSourceをDynamicRoutingDatasourceTransactionで読めるように
    fun getTargetDatasource(): DataSource = determineTargetDataSource()

    // 透過的なデータソース選択でTransactionを実装する際に必要
    override fun getWrappedObject(): Any {
        return determineTargetDataSource()
    }
}

determineCurrentLookupKeyをoverrideすることで、現在参照しているDatasourceのkeyを返却します。 また、Transaction実装が必要な場合、 org.springframework.core.InfrastructureProxyを実装する必要があります。(後述)

DatabaseContextHolderの実装はこちら

class DatabaseContextHolder {
    companion object {
        @JvmStatic
        private val contextHolder = ThreadLocal<String>()

        @JvmStatic
        fun setDatabase(database: String) {
            contextHolder.set(database)
        }

        @JvmStatic
        fun getDatabase(): String = contextHolder.get()

        @JvmStatic
        fun clearDatabase() = contextHolder.remove()
    }
}

keyはThreadLocalで保持します。
最後にdynamicRoutingDatasourceResolverをBeanに登録します。

@Configuration
class DynamicRoutingDatasourceConfiguration {
    @Bean
    fun dynamicRoutingDatasourceResolver(
        @Qualifier(ORACLE_MASTER) oracleDatasource: DataSource,
        @Qualifier(MYSQL_MASTER) mysqlDatasource: DataSource
    ): DynamicRoutingDatasourceResolver {
        val resolver = DynamicRoutingDatasourceResolver()
        val map = mapOf<Any, Any>(ORACLE_MASTER to oracleDatasource, MYSQL_MASTER to mysqlDatasource)

        resolver.setTargetDataSources(map)
        // resolver.setDefaultTargetDataSource(oracleDatasource)

        return resolver
    }
}

SpringManagedTransactionを継承したclassを作成

import com.example.demo.infra.database.configuration.DynamicRoutingDatasourceResolver
import org.mybatis.spring.transaction.SpringManagedTransaction
import org.springframework.jdbc.datasource.ConnectionHolder
import org.springframework.jdbc.datasource.DataSourceUtils
import org.springframework.transaction.support.TransactionSynchronizationManager
import java.sql.Connection
import javax.sql.DataSource

class DynamicRoutingDatasourceTransaction(
    private val dynamicRoutingDatasourceResolver: DynamicRoutingDatasourceResolver
) : SpringManagedTransaction(dynamicRoutingDatasourceResolver) {

    // transaction境界内で利用したdatasourceとconnectionを保持する
    // transaction管理していない場合、mapのサイズは常に1になる
    private val datasourceConnections = mutableMapOf<DataSource, DataSourceConnection>()

    override fun getConnection(): Connection {
        val datasource = dynamicRoutingDatasourceResolver.getTargetDatasource()

        if (datasourceConnections.containsKey(datasource)) {
            return datasourceConnections[datasource]?.getConnection() ?: throw NullPointerException()
        }

        // 現在のdatasourceのconnectionを返す
        val connection = dynamicRoutingDatasourceResolver.connection

        datasourceConnections[datasource] = DataSourceConnection(connection, datasource)

        return connection
    }

    override fun commit() {
        for (datasourceConnection in datasourceConnections.values) {
            // transaction制御対象外のものはここでcommit
            if (!datasourceConnection.isTransactional()) {
                datasourceConnection.commit()
            }
        }
    }

    override fun rollback() {
        for (datasourceConnection in datasourceConnections.values) {
            // transaction制御対象外のものはここでrollback
            if (!datasourceConnection.isTransactional()) {
                datasourceConnection.rollback()
            }
        }
    }

    override fun close() {
        datasourceConnections.values.forEach {
            it.close()
        }
    }

    override fun getTimeout(): Int? {
        for (datasourceConnection in datasourceConnections.values) {
            val connectionHolder = datasourceConnection.connectionHolder()
            if (connectionHolder != null && connectionHolder.hasTimeout()) {
                return connectionHolder.timeToLiveInSeconds
            }
        }

        return null
    }

    private class DataSourceConnection(
        private val connection: Connection,
        private val datasource: DataSource
    ) {
        internal fun getConnection() = connection

        internal fun isTransactional(): Boolean {
            if (connection.autoCommit) {
                return false
            }

            return DataSourceUtils.isConnectionTransactional(connection, datasource)
        }

        internal fun commit() {
            connection.commit()
        }

        internal fun rollback() {
            connection.rollback()
        }

        internal fun close() {
            DataSourceUtils.releaseConnection(connection, datasource)
        }

        internal fun connectionHolder(): ConnectionHolder? =
            TransactionSynchronizationManager.getResource(datasource) as ConnectionHolder?
    }
}

SpringManagedTransactionFactoryを継承したclassを作成

class DynamicRoutingDatasourceTransactionFactory : SpringManagedTransactionFactory() {
    override fun newTransaction(
        dataSource: DataSource?,
        level: TransactionIsolationLevel?,
        autoCommit: Boolean
    ): Transaction {
        if (dataSource is DynamicRoutingDatasourceResolver) {
            return DynamicRoutingDatasourceTransaction(dataSource)
        }
        return super.newTransaction(dataSource, level, autoCommit)
    }
}

DynamicRoutingDatasourceResolverの場合に先程定義したDynamicRoutingTransactionを返します。

MybatisのSqlSessionFactoryをBean登録

@Configuration
@MapperScan(
    basePackages = ["com.example.demo.infra.mapper"],
    annotationClass = Mapper::class
)
class DynamicSqlSessionFactory {
    @Bean
    fun sqlSessionFactory(
        dynamicRoutingDatasourceResolver: DynamicRoutingDatasourceResolver,
        mybatisProperties: MybatisProperties
    ): SqlSessionFactory {
        val sqlSessionFactoryBean = SqlSessionFactoryBean()

        sqlSessionFactoryBean.setDataSource(dynamicRoutingDatasourceResolver)
        sqlSessionFactoryBean.setMapperLocations(mybatisProperties.resolveMapperLocations())

        // default config
        val configuration = org.apache.ibatis.session.Configuration()
        mybatisProperties.configuration?.let {
            configuration.jdbcTypeForNull = it.jdbcTypeForNull

            configuration.isCacheEnabled = it.isCacheEnabled
            configuration.defaultExecutorType = it.defaultExecutorType
            configuration.defaultFetchSize = it.defaultFetchSize
            configuration.defaultStatementTimeout = it.defaultStatementTimeout
            configuration.isMapUnderscoreToCamelCase = true
        }
        sqlSessionFactoryBean.setConfiguration(configuration)

        // transaction factory
        sqlSessionFactoryBean.setTransactionFactory(DynamicRoutingDatasourceTransactionFactory())

        return sqlSessionFactoryBean.`object` ?: throw NullPointerException()
    }
}

datasourceにdynamicRoutingDatasourceResolverを、 transactionFactoryにdynamicRoutingDatasourceTransactionFactoryを設定します。

DatabaseContextHolderを更新するInterceptorを作成

@Aspect
@Component
class DynamicDatasourceInterceptor {
    @Around("execution(* com.example.demo.infra.dao..*.*(..)) && @target(datasourceComponent)")
    fun advice(proceedingJoinPoint: ProceedingJoinPoint, datasourceComponent: DatasourceComponent): Any? {

        logger.debug("select database: ${datasourceComponent.db}")

        DatabaseContextHolder.setDatabase(datasourceComponent.db)

        val result = proceedingJoinPoint.proceed()

        DatabaseContextHolder.clearDatabase()

        return result
    }
}

@Component
@Target(AnnotationTarget.CLASS)
annotation class DatasourceComponent(
    val db: String
)

AOPで、annotationを設定したDao classにアクセスした際にDatabaseContextHolderが更新されて利用するデータソースを決定します。

Dao sample

@DatasourceComponent(ORACLE_MASTER)
class SampleDaoImpl : AbstractDao(), SampleDao {
    override fun update() {
        sqlSession.getMapper(SampleMapper::class.java).update()
    }

    override fun find(): List<SampleDto> =
        sqlSession.getMapper(SampleMapper::class.java).find()
}

import org.apache.ibatis.session.SqlSession
import org.springframework.beans.factory.annotation.Autowired

abstract class AbstractDao {
    @Autowired
    protected lateinit var sqlSession: SqlSession
}

ここまでで一応完成です。

InfrastructureProxyの実装について

前述したように、Transactionを行う場合はorg.springframework.core.InfrastructureProxyを実装する必要があります。
これは、例えばSqlException等の例外が発生した際にMybatisから返されるDatasourceが こちらがoverrideしたlookUpKeyで決定される前のdatasource (今回ではdynamicRoutingDatasourceResolver) のため、例外処理内で実際に使用しているDatasourceを特定できなくなるためです。 そのため、InfrastructureProxyを実装して、現在参照しているDatasourceを返却する必要があります。

流れとしては、例外発生時、MybatisがSqlExceptionをDataAccessExceptionに変換しようとして、 その処理内で現在のDatasource / Connectionを取得するために、最終的に以下にいきます。 https://github.com/spring-projects/spring-framework/blob/master/spring-tx/src/main/java/org/springframework/transaction/support/TransactionSynchronizationUtils.java#L66

ここでInfrastructureProxyを実装していないと、dynamicRoutingDatasourceResolverが返されます。
すると、以下でConnectionHolderがnullで返され、新しいConnectionが生成されてしまいます。 https://github.com/spring-projects/spring-framework/blob/master/spring-jdbc/src/main/java/org/springframework/jdbc/datasource/DataSourceUtils.java#L103

現在保持している本当のconnectionはリリースされてcloseされます。 そして、Springはすでにclose されたconnectionについてrollback等の処理を開始するためConnection is closedと怒られてしまいます。

java.sql.SQLException: Connection is closed
    at com.zaxxer.hikari.pool.ProxyConnection$ClosedConnection.lambda$getClosedConnection$0(ProxyConnection.java:489)
    at com.sun.proxy.$Proxy89.isReadOnly(Unknown Source)
    at com.zaxxer.hikari.pool.HikariProxyConnection.isReadOnly(HikariProxyConnection.java)
    at org.springframework.jdbc.datasource.DataSourceUtils.resetConnectionAfterTransaction(DataSourceUtils.java:237)
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:376)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:1007)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:878)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:812)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.completeTransactionAfterThrowing(TransactionAspectSupport.java:551)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:298)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.example.demo.application.usecase.SampleFacade$$EnhancerBySpringCGLIB$$668ad13.update(<generated>)
    at com.example.demo.web.controller.SampleController.update(SampleController.kt:21)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)

上記の実装についてはサンプルが少なかったので、本当にこれで正しいかはちょっと不安ですが。。説明的にも不安。

まとめ

Spring Boot × MyBatis × HikariCPでの透過的な複数データソース実装についてまとめました。
どうしても複雑化するので、必要がない限りは静的な実装を利用したほうが良いかなと思います!