SpringCloud06:Seata分布式事务

官方文档:https://seata.apache.org/zh-cn/docs/overview/what-is-seata

产生原因

比如一个分布式的商城,用户在发起购买请求的时候,会涉及多个微服务:订单微服务,库存微服务和账户微服务。每个微服务有自己独立的数据库。

在理想的情况下:

如果库存服务异常抛出 → 账户服务感知失败并回滚 → 订单服务也回滚 → 整个调用链像一个事务那样失败回滚。

确实从调用链的视角看上去像是一个“事务”,这也正是很多简单系统中的“伪分布式事务”的做法(或者叫服务级回滚)。

但现实中可能出问题的地方(为什么还需要分布式事务)

  1. 网络异常导致调用链中断:

    • 比如库存服务其实扣减了库存,但在向账户服务返回异常前网络断了。

    • 账户服务以为库存服务失败,自己回滚,但库存服务实际上成功执行了。

    • 数据不一致发生了:库存减了,订单没生成。

  2. 服务内部没有事务保证或回滚失败

    • 如果账户服务记录购买记录成功了,调用库存失败了,但账户服务的回滚操作失败或未实现,也会造成数据不一致。
  3. 异步调用 / 消息队列的参与

    • 如果你通过消息通知库存服务(而不是同步调用),就没法靠异常来驱动整个调用链回滚了。
    • 分布式系统为了性能,很多时候都会引入异步消息队列,这个时候异常是无法传递的。
  4. 各个服务之间的事务隔离性:服务之间的本地事务是独立提交和回滚的,没有一个中心协调器来确保所有服务都成功后再提交,否则就回滚。

搭建环境

需求

有这么一个业务:发起采购,会先调用Storage微服务扣减库存,接着在调用Order微服务下订单,之后再调用Account微服务扣减用户的余额。

数据库

CREATE DATABASE IF NOT EXISTS `storage_db`;
USE `storage_db`;
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
                               `id` int(11) NOT NULL AUTO_INCREMENT,
                               `commodity_code` varchar(255) DEFAULT NULL,
                               `count` int(11) DEFAULT 0,
                               PRIMARY KEY (`id`),
                               UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO storage_tbl (commodity_code, count) VALUES ('P0001', 100);
INSERT INTO storage_tbl (commodity_code, count) VALUES ('B1234', 10);
-- 注意此处0.3.0+ 增加唯⼀索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE DATABASE IF NOT EXISTS `order_db`;
USE `order_db`;
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
                             `id` int(11) NOT NULL AUTO_INCREMENT,
                             `user_id` varchar(255) DEFAULT NULL,
                             `commodity_code` varchar(255) DEFAULT NULL,
                             `count` int(11) DEFAULT 0,
                             `money` int(11) DEFAULT 0,
                             PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 注意此处0.3.0+ 增加唯⼀索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,
                         `branch_id` bigint(20) NOT NULL,
                         `xid` varchar(100) NOT NULL,
 `context` varchar(128) NOT NULL,
 `rollback_info` longblob NOT NULL,
 `log_status` int(11) NOT NULL,
 `log_created` datetime NOT NULL,
 `log_modified` datetime NOT NULL,
 `ext` varchar(100) DEFAULT NULL,
 PRIMARY KEY (`id`),
 UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE DATABASE IF NOT EXISTS `account_db`;
USE `account_db`;
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
   `id` int(11) NOT NULL AUTO_INCREMENT,
   `user_id` varchar(255) DEFAULT NULL,
   `money` int(11) DEFAULT 0,
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO account_tbl (user_id, money) VALUES ('1', 10000);
-- 注意此处0.3.0+ 增加唯⼀索引 ux_undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
   `id` bigint(20) NOT NULL AUTO_INCREMENT,
   `branch_id` bigint(20) NOT NULL,
   `xid` varchar(100) NOT NULL,
   `context` varchar(128) NOT NULL,
   `rollback_info` longblob NOT NULL,
   `log_status` int(11) NOT NULL,
   `log_created` datetime NOT NULL,
   `log_modified` datetime NOT NULL,
   `ext` varchar(100) DEFAULT NULL,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

微服务本地事务

在对应方法上添加@Transactional注解,如:

@Transactional
@Override
public void deduct(String commodityCode, int count) {
    storageTblMapper.deduct(commodityCode, count);
    if (count == 5) {
        throw new RuntimeException("库存不足");
    }
}
场景 是否必须加 @EnableTransactionManagement
Spring Boot 项目(使用 Spring Boot Starter) 不需要手动加,自动配置已开启
传统 Spring 项目(非 Spring Boot) 需要手动加上 @EnableTransactionManagement
不确定环境 / 想确保事务功能无误 建议手动添加,安全起见

对于传统Spring项目,还需要加 @EnableTransactionManagement

@EnableTransactionManagement
@MapperScan("org.example.storage.mapper")
@EnableDiscoveryClient
@SpringBootApplication
public class SeataStorageMainApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataStorageMainApplication.class, args);
    }
}

只能控制本地事务

@Transactional
@Override
public OrderTbl create(String userId, String commodityCode, int orderCount) {
    //1、计算订单价格
    int orderMoney = calculate(commodityCode, orderCount);

    //2、扣减账户余额: 这里远程调用Account微服务,Account微服务有自己独有的Account数据库
    accountFeignClient.debit(userId, orderMoney);
    //3、保存订单
    OrderTbl orderTbl = new OrderTbl();
    orderTbl.setUserId(userId);
    orderTbl.setCommodityCode(commodityCode);
    orderTbl.setCount(orderCount);
    orderTbl.setMoney(orderMoney);

    //3、保存订单:这里是操作本地微服务(order微服务),操作其独有的order数据库
    orderTblMapper.insert(orderTbl);

    int i = 10/0;

    return orderTbl;
}

Spring 的 @Transactional 只能控制本地事务

  • 当前的事务注解@Transactional 只能控制 当前这个服务(Order 服务) 中的本地数据库事务,比如:orderTblMapper.insert(orderTbl);
  • 无法控制远程微服务(如 Account 服务)中发生的操作,比如accountFeignClient.debit(...)
  • Feign 发出的 HTTP 请求跨出了当前 JVM,Spring 的事务控制是无法感知和回滚远程微服务中的数据库操作的
服务 操作 数据库 能否被本地事务控制
Order 服务 插入订单 order ✅ 被 @Transactional 控制
Account 服务 扣减账户金额(通过 Feign) account ❌ 不在当前事务控制范围

架构原理

在Seata中,有一个TC(Transaction Coordinator) 事务协调者,用于总体管理所有的微服务。它的作用是维护全局和分支事务的状态,驱动全局事务提交或回滚。

第二个组件是TM (Transaction Manager) 事务管理器,用于定义全局事务的范围,也就是定义开始全局事务、提交或回滚全局事务。

第三个组件是RM (Resource Manager) 资源管理器,它是用于管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

整合Seata

Seata-server:Seata服务器

下载地址:https://seata.apache.org/zh-cn/download/seata-server/

启动:bash seata-server.sh

访问:http://localhost:7091

登录:密码和用户名都是seata

引入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

配置文件

在resources下面创建一个file.conf文件,用于配置Seata-server的地址:

 service {
   #transaction service group mapping
   # vgroupMapping 表示事务的分组映射
   # default_tx_group 表示默认的事务组,这个默认事务组的名称为"default"
   vgroupMapping.default_tx_group = "default"
   #only support when registry.type=file, please don't set multiple addresses
   # default 和上面的 vgroupMapping.default_tx_group 对应
   # default.grouplist 表示事务组的地址列表
   # 注意 7091 是 Seata server web 服务器的端口
   # 8091 是 Seata server 的事务处理端口,也就是 TC 事务协调者 的端口
   default.grouplist = "127.0.0.1:8091"
   #degrade, current not support
   # enableDegrade 表示是否启用降级
   enableDegrade = false
   #disable seata
   # disableGlobalTransaction 表示是否禁用全局事务
   disableGlobalTransaction = false
 }

开启全局事务

在Business这个微服务(TM)开启全局事务:

@GlobalTransactional//表示开启全局事务
@Override
public void purchase(String userId, String commodityCode, int orderCount) {
    //1. 扣减库存
    storageFeignClient.deduct(commodityCode, orderCount);

    //2. 创建订单
    orderFeignClient.create(userId, commodityCode, orderCount);
}

这个purchase是最大事务的入口。这个时候就可以实现全局事务的控制。

总结

  1. 启动Seata-server(TC),统一管理事务
  2. 引入Seata依赖
  3. 给每个微服务编写file.conf,用于找到Seata-server(TC)
  4. 在事务的最大入口,添加@GlobalTransactional注解,开始全局事务
  5. 需要有undo_log

原理

二阶提交协议

  1. 请求发送给Business,开始采购。它会调用两个远程方法,一个扣减库存(storageFeignClient.deduct),一个创建订单(orderFeignClient.create)。一旦purchase方法标注了@GlobalTransactional注解之后,代表全局事务就开启了。方法一旦执行,就会向Seata-server(TC) 注册一个事务。每一个全局事务都会有一个唯一id。所有的分支事务全部执行完之后,全局事务才能结束

    @GlobalTransactional//表示开启全局事务
    @Override
    public void purchase(String userId, String commodityCode, int orderCount) {
        //1. 扣减库存
        storageFeignClient.deduct(commodityCode, orderCount);
    
        //2. 创建订单
        orderFeignClient.create(userId, commodityCode, orderCount);
    }
    
  2. 第一阶段:本地事务(分支事务)。下面以扣减库存为例。其他的创建订单和扣减余额也类似。

    @Transactional
    @Override
    public void deduct(String commodityCode, int count) {
        storageTblMapper.deduct(commodityCode, count);
    }
    
    • 在扣减库存之前,Seata会解析storageTblMapper.deduct的SQL语句

      update storage_tbl set count = count-2 where commodity_code='P0001'
      
    • 根据storageTblMapper.deduct的SQL语句查询前镜像。也就是拿到where的条件,生成一条查询语句,定位到要修改的数据:

      select * from storage_tbl where commodity_code='P0001'
      
    • 拿到前镜像之后,再去执行业务SQL

      update storage_tbl set count = count-2 where commodity_code='P0001'
      
    • 执行完业务SQL之后,Seata还会再根据where条件获取后镜像。也就是获取修改之后的数据。这个时候再查就是根据id查询了。

      select * from storage_tbl where id =1
      
    • 在将获取的前后镜像数据插入到undo_log表里面。这张表,每一个分布式的数据库里面都要有。

      CREATE TABLE `undo_log` (
         `id` bigint(20) NOT NULL AUTO_INCREMENT,
         `branch_id` bigint(20) NOT NULL,
         `xid` varchar(100) NOT NULL,
         `context` varchar(128) NOT NULL,
         `rollback_info` longblob NOT NULL,
         `log_status` int(11) NOT NULL,
         `log_created` datetime NOT NULL,
         `log_modified` datetime NOT NULL,
         `ext` varchar(100) DEFAULT NULL,
         PRIMARY KEY (`id`),
         UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
      
    • 接着,会向Seata-server注册分布式事务

    • 并且申请storage_tbl的表1号记录的全局锁。以防止其他线程会修改1号记录。

    • 提交本地事务,把业务数据和undo_log一起提交到``storage_db`数据库里面。

      以下是rollback的信息:

      {
        "@class": "org.apache.seata.rm.datasource.undo.BranchUndoLog",
        "xid": "10.201.129.159:8091:2873974497041625105",
        "branchId": 2873974497041625106,
        "sqlUndoLogs": ["java.util.ArrayList", [{
          "@class": "org.apache.seata.rm.datasource.undo.SQLUndoLog",
          "sqlType": "UPDATE",
          "tableName": "storage_tbl",
          "beforeImage": {
            "@class": "org.apache.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "storage_tbl",
            "rows": ["java.util.ArrayList", [{
              "@class": "org.apache.seata.rm.datasource.sql.struct.Row",
              "fields": ["java.util.ArrayList", [{
                "@class": "org.apache.seata.rm.datasource.sql.struct.Field",
                "name": "count",
                "keyType": "NULL",
                "type": 4,
                "value": 98
              }, {
                "@class": "org.apache.seata.rm.datasource.sql.struct.Field",
                "name": "id",
                "keyType": "PRIMARY_KEY",
                "type": 4,
                "value": 1
              }]]
            }]]
          },
          "afterImage": {
            "@class": "org.apache.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "storage_tbl",
            "rows": ["java.util.ArrayList", [{
              "@class": "org.apache.seata.rm.datasource.sql.struct.Row",
              "fields": ["java.util.ArrayList", [{
                "@class": "org.apache.seata.rm.datasource.sql.struct.Field",
                "name": "count",
                "keyType": "NULL",
                "type": 4,
                "value": 96
              }, {
                "@class": "org.apache.seata.rm.datasource.sql.struct.Field",
                "name": "id",
                "keyType": "PRIMARY_KEY",
                "type": 4,
                "value": 1
              }]]
            }]]
          }
        }]]
      }
      
    • 把提交结果(成功/失败)回报给Seata-server。

  3. 所有的分支事务的提交结果都是成功:给前端响应OK

    • 收到TC的提交请求,立即给前端响应OK
    • 同时添加异步任务,删除undo_log表里的相关记录
  4. 如果分支事务有失败的情况:全部回滚

    • 每一个微服务(RM)会收到TC的回滚请求,会开启一个本地事务处理回滚

      • 找到undo_log表里的相关记录

      • 校验数据。比较后镜像的数据与当前的数据是否一致。如果一致,说明没有被修改过,可以用前镜像的数据回滚。如果不一致,说明被修改过,这就需要相关的配置。

      • 执行回滚之后,删除undo_log表里的相关记录。

  5. 所有事务执行完成之后,以前申请的全局锁才会被释放。

四种事务模式

如何配置Seata的事务模式:

seata:
  data-source-proxy-mode: XA
  1. Seata AT 模式:默认的模式。也就是上面介绍的模式

  2. Seata XA 模式:XA prepare 后就申请锁,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待。事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,性能差。

  3. Seata TCC 模式:区别于在 AT 模式直接使用数据源代理来屏蔽分布式事务细节,业务方需要自行定义 TCC 资源的“准备”、“提交”和“回滚” 。比如在下方的例子中。(TCC模式需要自己写代码去控制事务的提交与回滚)

    public interface TccActionOne {
        @TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
        public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);
        public boolean commit(BusinessActionContext actionContext);
        public boolean rollback(BusinessActionContext actionContext);
    }
    
  4. Seata Saga 模式:Saga 模式是 SEATA 提供的长事务解决方案,比如一个事物可能需要2个小时。这个时候就不能用锁去锁住相关表。Saga 模式非常适合与消息队列一起使用,比如下面如果T3发生了异常,就可以发送消息,通知之前的模块,执行对应的回滚策略。在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1909773034@qq.com

×

喜欢就点赞,疼爱就打赏