【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理

【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理

  • 分布式事务概念
  • XA和JTA概述
  • SpringBoot集成atomikos
    • 数据库结构
    • pom
    • 通用工具类
      • R
      • BaseController
      • BaseExceptionCode
      • ExceptionCode
      • BaseException
      • BaseUncheckedException
      • BizException
    • application.yml
    • 数据源配置类
      • OrderXADataSourceConfig
      • UserXADataSourceConfig
    • 实体类
      • Order
      • User
    • Mapper
      • OrderMapper
      • UserMapper
    • Service
      • OrderService
        • 测试分布式事务
      • UserService
        • 测试分布式事务
    • Controller
      • OrderController
      • UserController
    • 启动类

分布式事务概念

讨论分布式事务之前我们分清两个概念:本地事务分布式事务

本地事务是解决单个数据源上的数据操作的一致性问题的话,而分布式事务则是为了解决跨越多个数据源上数据操作的一致性问题。

百度官方对分布式事务的定义是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

也就是说我们在操作一个业务逻辑过程中,涉及两个数据源(A、B),且很多时候A、B这两个数据源属于两个不同的物理环境。当我们操作A数据源过程中出现异常情况,那么必须让针对B数据源的操作回滚,同时A数据源的操作也回滚。

在Java开发过程中事务一般使用Spring为我们提供了方便的声明式事务方法@transactional。但是默认的Spring事务只支持单数据源,而实际上一个系统往往需要写多个数据源,这个时候我们就需要考虑如何通过Spring实现对分布式事务的支持。
SpringBoot官方提供推荐了Atomikos和 Bitronix两种无需服务器支持的分布式事务组件

JAVA领域中针对分布式事务的解决方案就是JTA(即Java Transaction API);

XA和JTA概述

XA 是由 X/Open 组织提出的分布式事务的一种协议(或者称之为分布式架构)。它主要定义了两部分的管理器,全局事务管理器及资源管理器。在 XA 的设计理念中,把不同资源纳入到一个事务管理器进行统一管理,例如数据库资源,消息中间件资源等,从而进行全部资源的事务提交或者取消,目前主流的数据库,消息中间件都支持 XA 协议。

JTA 叫做 Java Transaction API,它是 XA 协议的 JAVA 实现。目前在 JAVA 里面,关于 JTA 的定义主要是两部分

  • 事务管理器接口-----javax.transaction.TransactionManager
  • 资源管理器接口-----javax.transaction.xa.XAResource

在一般应用采用 JTA 接口实现事务,需要一个外置的 JTA 容器来存储这些事务,像 Tomcat。今天我们要讲的是 Atomikos,它是一个独立实现了 JTA 的框架,能够在我们的应用服务器中运行 JTA 事务。

SpringBoot集成atomikos

在这里插入图片描述

数据库结构

在这里插入图片描述

CREATE TABLE `tb_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  `name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '商品名称',
  `price` bigint DEFAULT NULL COMMENT '商品价格',
  `num` int DEFAULT '0' COMMENT '商品数量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=137 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT;

添加了username为唯一索引,方便后面测试多数据插库异常事务回滚

CREATE TABLE `tb_user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `username` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '收件人',
  `address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '地址',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_uername` (`username`)
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT;

pom

技术栈版本号
springboot2.3.2.RELEASE
druid1.1.10
mysql驱动8.0.33
mybatis-plus3.1.1
<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.3.2.RELEASE</version>
</parent>

<dependencies>
    <!-- druid-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.10</version>
    </dependency>
    <!-- druid-->
    <!-- mysql-connector-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
       <version>8.0.33</version>
    </dependency>
    <!-- mysql-connector-->
    <!-- mybatis-plus-->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.1.1</version>
    </dependency>
    <!-- mybatis-plus-->

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.25</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>32.0.0-jre</version>
    </dependency>
</dependencies>


<build>
    <resources>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include>**/*.xml</include>
            </includes>
            <filtering>false</filtering>
        </resource>
    </resources>
</build>

通用工具类

R

/**
 * @ClassName: R
 * @Description: 统一返回实体
 */
@Getter
@Setter
@SuppressWarnings({"AlibabaClassNamingShouldBeCamel"})
@Accessors(chain = true)
public class R<T> {
    public static final String DEF_ERROR_MESSAGE = "系统繁忙,请稍候再试";
    public static final String HYSTRIX_ERROR_MESSAGE = "请求超时,请稍候再试";
    public static final int SUCCESS_CODE = 0;
    public static final int FAIL_CODE = -1;
    public static final int TIMEOUT_CODE = -2;
    /**
     * 统一参数验证异常
     */
    public static final int VALID_EX_CODE = -9;
    public static final int OPERATION_EX_CODE = -10;
    /**
     * 调用是否成功标识,0:成功,-1:系统繁忙,此时请开发者稍候再试 详情见[ExceptionCode]
     */
    private int code;

    /**
     * 调用结果
     */
    private T data;

    /**
     * 结果消息,如果调用成功,消息通常为空T
     */
    private String msg = "ok";


    private String path;
    /**
     * 附加数据
     */
    private Map<String, Object> extra;

    /**
     * 响应时间
     */
    private long timestamp = System.currentTimeMillis();

    private R() {
        super();
    }

    public R(int code, T data, String msg) {
        this.code = code;
        this.data = data;
        this.msg = msg;
    }

    public static <E> R<E> result(int code, E data, String msg) {
        return new R<>(code, data, msg);
    }

    /**
     * 请求成功消息
     *
     * @param data 结果
     * @return RPC调用结果
     */
    public static <E> R<E> success(E data) {
        return new R<>(SUCCESS_CODE, data, "ok");
    }

    public static R<Boolean> success() {
        return new R<>(SUCCESS_CODE, true, "ok");
    }

    /**
     * 请求成功方法 ,data返回值,msg提示信息
     *
     * @param data 结果
     * @param msg  消息
     * @return RPC调用结果
     */
    public static <E> R<E> success(E data, String msg) {
        return new R<>(SUCCESS_CODE, data, msg);
    }

    /**
     * 请求失败消息
     *
     * @param msg
     * @return
     */
    public static <E> R<E> fail(int code, String msg) {
        return new R<>(code, null, (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg);
    }

    public static <E> R<E> fail(String msg) {
        return fail(OPERATION_EX_CODE, msg);
    }

    public static <E> R<E> fail(String msg, Object... args) {
        String message = (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg;
        return new R<>(OPERATION_EX_CODE, null, String.format(message, args));
    }

    public static <E> R<E> fail(BaseExceptionCode exceptionCode) {
        return validFail(exceptionCode);
    }

    public static <E> R<E> fail(BizException exception) {
        if (exception == null) {
            return fail(DEF_ERROR_MESSAGE);
        }
        return new R<>(exception.getCode(), null, exception.getMessage());
    }

    /**
     * 请求失败消息,根据异常类型,获取不同的提供消息
     *
     * @param throwable 异常
     * @return RPC调用结果
     */
    public static <E> R<E> fail(Throwable throwable) {
        return fail(FAIL_CODE, throwable != null ? throwable.getMessage() : DEF_ERROR_MESSAGE);
    }

    public static <E> R<E> validFail(String msg) {
        return new R<>(VALID_EX_CODE, null, (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg);
    }

    public static <E> R<E> validFail(String msg, Object... args) {
        String message = (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg;
        return new R<>(VALID_EX_CODE, null, String.format(message, args));
    }

    public static <E> R<E> validFail(BaseExceptionCode exceptionCode) {
        return new R<>(exceptionCode.getCode(), null,
                (exceptionCode.getMsg() == null || exceptionCode.getMsg().isEmpty()) ? DEF_ERROR_MESSAGE : exceptionCode.getMsg());
    }

    public static <E> R<E> timeout() {
        return fail(TIMEOUT_CODE, HYSTRIX_ERROR_MESSAGE);
    }


    public R<T> put(String key, Object value) {
        if (this.extra == null) {
            this.extra = Maps.newHashMap();
        }
        this.extra.put(key, value);
        return this;
    }

    /**
     * 逻辑处理是否成功
     *
     * @return 是否成功
     */
    public Boolean getIsSuccess() {
        return this.code == SUCCESS_CODE || this.code == 200;
    }

    /**
     * 逻辑处理是否失败
     *
     * @return
     */
    public Boolean getIsError() {
        return !getIsSuccess();
    }

    @Override
    public String toString() {
        return JSONObject.toJSONString(this);
    }
}

BaseController

/**
 * @ClassName: BaseController
 * @Description: controller 抽象基类
 */
public abstract class BaseController {
    /**
     * 成功返回
     *
     * @param data
     * @return
     */
    public <T> R<T> success(T data) {
        return R.success(data);
    }

    public R<Boolean> success() {
        return R.success();
    }

    /**
     * 失败返回
     *
     * @param msg
     * @return
     */
    public <T> R<T> fail(String msg) {
        return R.fail(msg);
    }

    public <T> R<T> fail(String msg, Object... args) {
        return R.fail(msg, args);
    }

    /**
     * 失败返回
     *
     * @param code
     * @param msg
     * @return
     */
    public <T> R<T> fail(int code, String msg) {
        return R.fail(code, msg);
    }

    public <T> R<T> fail(BaseExceptionCode exceptionCode) {
        return R.fail(exceptionCode);
    }

    public <T> R<T> fail(BizException exception) {
        return R.fail(exception);
    }

    public <T> R<T> fail(Throwable throwable) {
        return R.fail(throwable);
    }

    public <T> R<T> validFail(String msg) {
        return R.validFail(msg);
    }

    public <T> R<T> validFail(String msg, Object... args) {
        return R.validFail(msg, args);
    }

    public <T> R<T> validFail(BaseExceptionCode exceptionCode) {
        return R.validFail(exceptionCode);
    }
}

BaseExceptionCode


/**
 * @ClassName: BaseExceptionCode
 * @Description: 公共异常编码类
 */
public interface BaseExceptionCode {
    /**
     * 异常编码
     *
     * @return
     */
    int getCode();

    /**
     * 异常消息
     * @return
     */
    String getMsg();
}

ExceptionCode

/**
 * 全局错误码 10000-15000
 * <p>
 * 预警异常编码    范围: 30000~34999
 * 标准服务异常编码 范围:35000~39999
 * 邮件服务异常编码 范围:40000~44999
 * 短信服务异常编码 范围:45000~49999
 * 权限服务异常编码 范围:50000-59999
 * 文件服务异常编码 范围:60000~64999
 * 日志服务异常编码 范围:65000~69999
 * 消息服务异常编码 范围:70000~74999
 * 开发者平台异常编码 范围:75000~79999
 * 搜索服务异常编码 范围:80000-84999
 * 共享交换异常编码 范围:85000-89999
 * 移动终端平台 异常码 范围:90000-94999
 * <p>
 * 安全保障平台    范围:        95000-99999
 * 软硬件平台 异常编码 范围:    100000-104999
 * 运维服务平台 异常编码 范围:  105000-109999
 * 统一监管平台异常 编码 范围:  110000-114999
 * 认证方面的异常编码  范围:115000-115999
 *
 */
public enum ExceptionCode implements BaseExceptionCode {

    //系统相关 start
    SUCCESS(0, "成功"),
    SYSTEM_BUSY(-1, "系统繁忙~请稍后再试~"),
    SYSTEM_TIMEOUT(-2, "系统维护中~请稍后再试~"),
    PARAM_EX(-3, "参数类型解析异常"),
    SQL_EX(-4, "运行SQL出现异常"),
    NULL_POINT_EX(-5, "空指针异常"),
    ILLEGALA_ARGUMENT_EX(-6, "无效参数异常"),
    MEDIA_TYPE_EX(-7, "请求类型异常"),
    LOAD_RESOURCES_ERROR(-8, "加载资源出错"),
    BASE_VALID_PARAM(-9, "统一验证参数异常"),
    OPERATION_EX(-10, "操作异常"),


    OK(200, "OK"),
    BAD_REQUEST(400, "错误的请求"),
    /**
     * {@code 401 Unauthorized}.
     *
     * @see <a href="http://tools.ietf.org/html/rfc7235#section-3.1">HTTP/1.1: Authentication, section 3.1</a>
     */
    UNAUTHORIZED(401, "未经授权"),
    /**
     * {@code 404 Not Found}.
     *
     * @see <a href="http://tools.ietf.org/html/rfc7231#section-6.5.4">HTTP/1.1: Semantics and Content, section 6.5.4</a>
     */
    NOT_FOUND(404, "没有找到资源"),
    METHOD_NOT_ALLOWED(405, "不支持当前请求类型"),

    TOO_MANY_REQUESTS(429, "请求超过次数限制"),
    INTERNAL_SERVER_ERROR(500, "内部服务错误"),
    BAD_GATEWAY(502, "网关错误"),
    GATEWAY_TIMEOUT(504, "网关超时"),
    //系统相关 end

    REQUIRED_FILE_PARAM_EX(1001, "请求中必须至少包含一个有效文件"),
    //jwt token 相关 start

    JWT_TOKEN_EXPIRED(40001, "会话超时,请重新登录"),
    JWT_SIGNATURE(40002, "不合法的token,请认真比对 token 的签名"),
    JWT_ILLEGAL_ARGUMENT(40003, "缺少token参数"),
    JWT_GEN_TOKEN_FAIL(40004, "生成token失败"),
    JWT_PARSER_TOKEN_FAIL(40005, "解析token失败"),
    JWT_USER_INVALID(40006, "用户名或密码错误"),
    JWT_USER_ENABLED(40007, "用户已经被禁用!"),
    //jwt token 相关 end

    ;

    private int code;
    private String msg;

    ExceptionCode(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    @Override
    public int getCode() {
        return code;
    }

    @Override
    public String getMsg() {
        return msg;
    }


    public ExceptionCode build(String msg, Object... param) {
        this.msg = String.format(msg, param);
        return this;
    }

    public ExceptionCode param(Object... param) {
        msg = String.format(msg, param);
        return this;
    }
}


BaseException

/**
 * @ClassName: BaseException
 * @Description: 异常接口类
 */
public interface BaseException {

    /**
     * 统一参数验证异常码
     */
    int BASE_VALID_PARAM = -9;

    /**
     * 返回异常信息
     *
     * @return
     */
    String getMessage();

    /**
     * 返回异常编码
     *
     * @return
     */
    int getCode();
}

BaseUncheckedException

/**
 * @ClassName: BaseUncheckedException
 * @Description: 非运行期异常基类,所有自定义非运行时异常继承该类
 */
public class BaseUncheckedException extends RuntimeException implements BaseException {

    private static final long serialVersionUID = -778887391066124051L;

    /**
     * 异常信息
     */
    protected String message;

    /**
     * 具体异常码
     */
    protected int code;

    public BaseUncheckedException(int code, String message) {
        super(message);
        this.code = code;
        this.message = message;
    }

    public BaseUncheckedException(int code, String format, Object... args) {
        super(String.format(format, args));
        this.code = code;
        this.message = String.format(format, args);
    }


    @Override
    public String getMessage() {
        return message;
    }
    @Override
    public int getCode() {
        return code;
    }
}

BizException

/**
 * @ClassName: BizException
 * @Description: 业务异常 用于在处理业务逻辑时,进行抛出的异常。
 */
public class BizException extends BaseUncheckedException {

    private static final long serialVersionUID = -3843907364558373817L;

    public BizException(String message) {
        super(-1, message);
    }

    public BizException(int code, String message) {
        super(code, message);
    }

    public BizException(int code, String message, Object... args) {
        super(code, message, args);
    }

    /**
     * 实例化异常
     *
     * @param code    自定义异常编码
     * @param message 自定义异常消息
     * @param args    已定义异常参数
     * @return
     */
    public static BizException wrap(int code, String message, Object... args) {
        return new BizException(code, message, args);
    }

    public static BizException wrap(String message, Object... args) {
        return new BizException(-1, message, args);
    }

    public static BizException validFail(String message, Object... args) {
        return new BizException(-9, message, args);
    }

    public static BizException wrap(BaseExceptionCode ex) {
        return new BizException(ex.getCode(), ex.getMsg());
    }

    @Override
    public String toString() {
        return "BizException [message=" + message + ", code=" + code + "]";
    }
}

application.yml

spring:
  datasource:
    druid:
      order:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/cloud_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true
        user: root
        password: root
      user:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/cloud_user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true
        user: root
        password: root



mybatis-plus:
  #mybatis日志
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

logging:
  level:
    cn.zysheep.dao: debug

数据源配置类

OrderXADataSourceConfig

/**
 * @ClassName: OrderXADataSourceConfig
 * @Description: mybatis配置类 Order
 */
@Configuration
@MapperScan(basePackages = OrderXADataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "orderSqlSessionTemplate")
public class OrderXADataSourceConfig {
    /**
     * 扫描mapper接口包
     */
    static final String BASE_PACKAGES = "cn.zysheep.dao.order";
    /**
     * 扫描的mapper配置文件路径
     */
    private static final String MAPPER_LOCATION = "classpath:/mapper/order/*Mapper.xml";

    /**
     * 将这个对象放入spring容器中(交给Spring管理)
     * @ConfigurationProperties 自动配置属性
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.order")
    public XADataSource getDataSourceOrder(){
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    /**
     * 创建Atomikos数据源
     * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
     * @param xaDataSource
     * @return
     */
    @Bean
    @DependsOn("getDataSourceOrder")
    @Primary
    public DataSource dataSourceOrder(@Qualifier("getDataSourceOrder") XADataSource xaDataSource){
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        // 必须为数据源指定唯一标识
        atomikosDataSourceBean.setUniqueResourceName("dataSourceOrder");
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setTestQuery("select 1");
        atomikosDataSourceBean.setBorrowConnectionTimeout(3);
        atomikosDataSourceBean.setXaDataSource(xaDataSource);

        return atomikosDataSourceBean;
    }

    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean
    @Primary
    public SqlSessionFactory orderSqlSessionFactory(@Qualifier("dataSourceOrder") DataSource dataSource) throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>

        // 在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        //手动设置session工厂时,需要手动添加分页插件
        Interceptor[] plugins = new Interceptor[1];
        plugins[0] = new PaginationInterceptor();
        sqlSessionFactoryBean.setPlugins(plugins);

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean
    @Primary
    public SqlSessionTemplate orderSqlSessionTemplate(@Qualifier("orderSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

UserXADataSourceConfig

/**
 * @ClassName: UserXADataSourceConfig
 * @Description: mybatis配置类 User
 */
@Configuration
@MapperScan(basePackages = UserXADataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "userSqlSessionTemplate")
public class UserXADataSourceConfig {
    /**
     * 扫描mapper接口包
     */
    static final String BASE_PACKAGES = "cn.zysheep.dao.user";
    /**
     * 扫描的mapper配置文件路径
     */
    private static final String MAPPER_LOCATION = "classpath:/mapper/user/*Mapper.xml";


    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.user")
    public XADataSource getDataSourceUser(){
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    @Bean
    @DependsOn("getDataSourceUser")
    public DataSource dataSourceUser(@Qualifier("getDataSourceUser") XADataSource xaDataSource){
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("dataSourceUser");
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setTestQuery("select 1");
        atomikosDataSourceBean.setBorrowConnectionTimeout(3);
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        return atomikosDataSourceBean;
    }

    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean
    @Primary
    public SqlSessionFactory userSqlSessionFactory(@Qualifier("dataSourceUser") DataSource dataSource) throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>

        // 在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        //手动设置session工厂时,需要手动添加分页插件
        Interceptor[] plugins = new Interceptor[1];
        plugins[0] = new PaginationInterceptor();
        sqlSessionFactoryBean.setPlugins(plugins);

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean
    @Primary
    public SqlSessionTemplate userSqlSessionTemplate(@Qualifier("userSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

1、每个数据源对应一个配置类
2、每个配置类的@MapperScan注解不一样,各自对应自己mapper接口文件夹(这就是为什么要将不同数据源的mapper接口写在不同文件夹的原因了)
3、在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定。
4、配置工厂类的时候,需要指定各自mapper.xml存放的路径(这就是为什么要将不同数据源的mapper.xml写在不同文件夹的原因了)
5、配置工厂类的时候,需要手动将分页插件加进去。因为数据源相关的自动配置被我们关闭了,创建传统PaginationInterceptor类的方法已经不好使了

实体类

Order

@Builder
@Data
@TableName("tb_order")
public class Order {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("user_id")
    private Long userId;
    @TableField("name")
    private String name;
    @TableField("price")
    private Long price;
    @TableField("num")
    private Integer num;
}

User

@Builder
@Data
@TableName("tb_user")
public class User {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("username")
    private String username;
    @TableField("address")
    private String address;
}

Mapper

OrderMapper

在这里插入图片描述

public interface OrderMapper extends BaseMapper<Order> {
}

配置文件 OrderMapper.xml,Mapper类路径、和配置路径必须数据源配置类的路径一致否则会报错
在这里插入图片描述

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.zysheep.dao.order.OrderMapper">

</mapper>

UserMapper

public interface UserMapper extends BaseMapper<User> {
}

配置文件UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.zysheep.dao.user.UserMapper">

</mapper>

Service

OrderService

public interface OrderService extends IService<Order> {
    /**
     * 保存订单
     */
    void saveOrder() throws Exception;
}
@Service
@AllArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {

    private final OrderMapper orderMapper;

    private final UserMapper userMapper;

    /**
     * 实现多数据库操作
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        // order数据源
        Order order = Order.builder()
                .userId(1000L)
                .name("Apple 苹果 iPhone 12 ")
                .price(699900L)
                .num(1).build();
        orderMapper.insert(order);

        // user数据源
        User user = User.builder()
                .id(1001L)
                .address("长沙")
                .username("封于修").build();

        userMapper.insert(user);

        // throw new Exception("12312");
    }
}

测试分布式事务

atomikos多数据源分布式事务和Spring声明式事务使用方法一样,类或方法加@Transactional注解。

1、正常保存

2、order数据源保存成功,user数据源保存成功,方法其他地方抛出异常,方法事务回滚

3、order数据源保存成功,user数据源保存失败,方法事务回滚

4、order数据源保存失败,user数据源保存不执行,方法事务回滚

UserService

public interface UserService extends IService<User> {
    /**
     * 保存用户
     * @throws Exception
     */
    void saveUser() throws Exception;
}
@Service
@AllArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {

    private final OrderMapper orderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveUser() throws Exception {
        // order数据源  抛出异常,方法事务回滚
        Order order = Order.builder()
                .userId(1000L)
                .name("Apple 苹果 iPhone 12 ")
                .price(699900L)
                .num(1).build();
        orderMapper.insert(order);


        // 2、user数据源 抛出异常,方法事务回滚
        User user = User.builder()
                .id(1001L)
                .address("长沙")
                .username("封于修").build();

        saveBatch(Collections.singletonList(user));

        // 1、主方法抛出异常,方法事务回滚
        // throw new Exception("12312");
    }
}

测试分布式事务

atomikos多数据源分布式事务和Spring声明式事务使用方法一样,类或方法加@Transactional注解。

这里主要测试Mybaits-Plus提供的批量新增是否支持 atomikos多数据源分布式事务,测试是方法内部其他数据源发生异常事务是可以回滚的;

Controller

OrderController

@RestController
@RequestMapping("/order")
@AllArgsConstructor
public class OrderController extends BaseController {

    private final OrderService orderService;

    @PostMapping("/save")
    public R save() throws Exception {
        orderService.saveOrder();
        return success();
    }
}

UserController

@RestController
@RequestMapping("/user")
@AllArgsConstructor
public class UserController extends BaseController {
    private final UserService userService;

    @PostMapping("/batchSave")
    public R batchSave() throws Exception {
        userService.saveUser();
        return success();
    }
}

启动类

@SpringBootApplication
public class AtomikosApplication {
    public static void main(String[] args) {
        SpringApplication.run(AtomikosApplication.class, args);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/39743.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶 - 3.类和对象(中)

目录 1.类的6个默认成员函数 2.构造函数 2.2特性 3.析构函数 3.1 概念 3.2 特性 4. 拷贝构造函数 4.1 概念 4.2 特征 5.赋值运算符重载 5.1运算符重载 5.2 赋值运算符重载 5.3 前置和后置重载 6.日期类的实现 7.const成员 8.取地址及const取地址操作符重载 1.类…

为什么项目可见性难以实现?该如何提高?

在项目和专业服务管理中&#xff0c;失败有时难以避免。沟通不足和需求定义不明确被认为是造成失败的最大原因&#xff0c;这意味着项目可见性和信息流动至关重要。 什么是项目可见性&#xff1f; 项目可见性是组织项目相关信息的方式&#xff0c;以便所有团队成员、项目经理…

使用Jenkins自由风格的软件项目实现接口自动化测试持续集成

这里写目录标题 一、JOB项目配置1、添加描述2、限制项目的运行节点3、源码管理4、构建触发器5、构建步骤6、构建后操作 一、JOB项目配置 1、添加描述 可选选项可填可不填 2、限制项目的运行节点 节点中要有运行环境所需的配置 节点配置教程&#xff1a;https://blog.csdn…

Go语言之并发编程练习,GO协程初识,互斥锁,管道:channel的读写操作,生产者消费者

GO协程初识 package mainimport ("fmt""sync""time" )func read() {defer wg.Done()fmt.Println("read start")time.Sleep(time.Second * 3)fmt.Println("read end") }func listenMusci() {defer wg.Done()fmt.Println(&qu…

在云计算环境中,保护Java应用程序可用的有效措施和工具

云计算&#xff08;Cloud&#xff09;技术是近年来计算机科学的一个重要突破。大多数组织已经通过将自己的应用程序移入云平台而获益。不过&#xff0c;如何保证应用程序在第三方服务器上的安全性&#xff0c;是一项艰巨的挑战。 在本文中&#xff0c;我们将重点讨论Java&…

长沙打造“全球研发中心城市”,智能网联产业如何交卷?

作者 | 魏启扬 来源 | 洞见新研社 知乎上有一个浏览超百万的热门问题——“大家怎么看待长沙这个城市&#xff1f;” 答主“星球研究所”的回答获得了高赞&#xff0c;“这是一个天性如火的城市”。 网红城市的外衣下&#xff0c;从湖南卫视的综艺节目&#xff0c;到网红美…

数据结构--栈

一、栈 数组是一种连续存储、随机访问的线性表&#xff0c;链表属于分散存储、连续访问的线性表。它们每个数据都有其相对位置&#xff0c;有至多一个直接前驱和之多一个直接后继。栈&#xff08;Stack&#xff09;和队列&#xff08;Queue&#xff09;也属于线性表&#xff0c…

Fortinet Accelerate 2023·中国区巡展收官丨让安全成就未来

7月18日&#xff0c;2023 Fortinet Accelerate Summit在上海成功举办&#xff01;这亦象征着“Fortinet Accelerate2023中国区巡展”圆满收官。Fortinet携手来自多个典型行业的百余位代表客户&#xff0c;以及亚马逊云科技、Telstra - PBS 太平洋电信、Tenable等多家生态合作伙…

字幕切分视频

Whisper 仓库地址&#xff1a; https://github.com/openai/whisper 可用模型信息&#xff1a; 测试视频&#xff1a;18段&#xff0c;总共447S视频&#xff08;11段前&#xff1a;有11段开头有停顿的视频&#xff09; Tiny: 跑完&#xff1a;142S &#xff0c;11段前&#xf…

学习opencv.js之基本使用方法(读取,显示,灰度化,边缘检测,特征值点检测)

opencv.js是什么 OpenCV.js 是 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;的 JavaScript 版本。OpenCV 是一个广泛使用的计算机视觉和图像处理库&#xff0c;提供了一系列功能强大的算法和工具&#xff0c;用于处理图像、视频、特征提取、对象识别等…

无虚拟 DOM 版 Vue 进行到哪一步了?

前言 就在一年前的 Vue Conf 2022&#xff0c;尤雨溪向大家分享了一个非常令人期待的新模式&#xff1a;无虚拟 DOM 模式&#xff01; 我看了回放之后非常兴奋&#xff0c;感觉这是个非常牛逼的新 feature&#xff0c;鉴于可能会有部分人还不知道或者还没听过什么是 Vue 无虚…

智能电表数据采集器

智能电表数据采集器是一种用于采集智能电表数据的设备&#xff0c;它可以将智能电表的数据传输到远程服务器上&#xff0c;以便进行数据分析和监控。智能电表数据采集器的主要功能是采集智能电表的实时数据&#xff0c;并将其发送到远程服务器上&#xff0c;从而实现对智能电表…

C语言--程序环境和预处理

翻译环境 C语言的代码是文本信息&#xff0c;对于计算机来说无法直接理解&#xff0c;需要通过翻译环境进行翻译成二进制信息&#xff1b; 我们在写代码的时候&#xff0c;一般都会写在一个源文件中&#xff0c;这时候我们就使用我们的编译器(VS)将其转换为机器代码&#xff0…

关于GPT、AI绘画、AI提词器等AI技术的探讨

目前的AI潮流非常火热&#xff0c;CHATGPT可谓是目前大模型人工智能的代表&#xff0c;刚开始听说chatGPT可以写代码&#xff0c;写作&#xff0c;写方案&#xff0c;无所不能。还有AI绘画也很&#xff2e;&#xff22;作为一个程序员&#xff0c;为了体验这些&#xff21;&…

node基于express+mongodb项目的整体结构搭建和逻辑抽离

一、为什么需要逻辑抽离 这是我用express实现的一个缩减版的注册功能,如下&#xff1a; app.js const express require("express"); const app express();// 连接数据库 const mongoose require("mongoose"); // 连接数据库myTest mongoose.connect(…

【云原生】k8s之Ingress

1.Ingress的相关知识 1.1 Ingress的简介 service的作用体现在两个方面&#xff0c;对集群内部&#xff0c;它不断跟踪pod的变化&#xff0c;更新endpoint中对应pod的对象&#xff0c;提供了ip不断变化的pod的服务发现机制&#xff1b;对集群外部&#xff0c;他类似负载均衡器…

【剧前爆米花--web】HTTP协议格式详解以及构造

作者&#xff1a;困了电视剧 专栏&#xff1a;《JavaEE初阶》 文章分布&#xff1a;这是一篇关于HTTP协议的文章&#xff0c;在这篇文章中我会说明HTTP协议格式以及相关的构造&#xff0c;希望对你有所帮助&#xff01; 目录 HTTP协议 HTTP协议格式 HTTP请求 HTTP响应详情…

前端基本功 用 React Hooks + Antd 实现一个 Todo-List

背景 使用 React Hooks 以及组件库 Antd 来实现一个可以 增删 标记是否完成 的 todo-list 思路 要实现一个 todo-list 首先想到用 useState 维护一个状态数组来保存当前 list &#xff0c;还要用一个状态维护添加框中的内容 const [todos, setTodos] useState(initialValu…

如何用双指针法解决力扣“反转单词前缀”问题

本篇博客会讲解力扣“2000. 反转单词前缀”的解题思路&#xff0c;这是题目链接。 本题的思路是&#xff1a;先调用strchr函数&#xff0c;在字符串word中查找字符ch&#xff0c;若找到了&#xff0c;则会返回一个非空指针p&#xff0c;指向ch在word中的位置。为了反转从word到…

【UE4 塔防游戏系列】10-防御塔升级

目录 效果 步骤 一、根据防御塔等级修改子弹伤害 二、根据防御塔等级修改子弹速度 三、根据防御塔等级修改检测半径 四、根据防御塔等级修改子弹颜色 五、根据防御塔等级修改换弹时间 效果 步骤 一、根据防御塔等级修改子弹伤害 1. 打开“TowerBaseBullet_Child”&…