使用 Seata 处理分布式事务

8 minute

分布式事务

​分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作。

这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务。

例如用户下一个订单,需要首先创建订单,然后删减库存,接着扣除用户的金钱,最后完成订单。这个过程中,每个操作都可以作为一个微服务,每个微服务操作对应的数据库,而各个数据库可能分布在不同机器上,那么分布式事务就产生了,我们要确保一个事务被正确地处理,必须解决好分布式事务的数据提交与回滚。

Seata

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。(来源官网)

  • TC (Transaction Coordinator) - 事务协调者:

    维护全局和分支事务的状态,驱动全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器:

    定义全局事务的范围:开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - 资源管理器:

    管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

编码测试

架构说明

开启三个微服务,创建订单服务,删减库存服务,扣除用户金钱服务,均注册到 nacos。

由订单服务作为入口,首先创建订单,然后删减库存,最后扣钱,完成订单,各个服务数据存取操作均处于不同数据库中。

使用 seata 作为分布式事务解决方案,也注册到 nacos 中。

初始化 Seata

下载安装 Seata 1.0.0,然后在 MySQL 新建 seata 表:sql 链接

配置 Seata

配置 type 类型为 nacos:

 1# registry.conf
 2registry {
 3  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
 4  type = "nacos"
 5
 6  nacos {
 7    serverAddr = "localhost:8848"
 8    namespace = ""
 9    cluster = "default"
10  }
11...
12}

my_test_tx_group 修改为自定义 group 名,store.mode 改为 db,修改 db 配置内容:

 1# file.conf
 2service {
 3  #transaction service group mapping
 4  vgroup_mapping.jzh = "default"
 5  #only support when registry.type=file, please don't set multiple addresses
 6  default.grouplist = "127.0.0.1:8091"
 7  #disable seata
 8  disableGlobalTransaction = false
 9}
10
11## transaction log store, only used in seata-server
12store {
13  ## store mode: file、db
14  mode = "db"
15
16  ## file store property
17  file {
18    ## store location dir
19    dir = "sessionStore"
20  }
21
22  ## database store property
23  db {
24    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
25    datasource = "dbcp"
26    ## mysql/oracle/h2/oceanbase etc.
27    db-type = "mysql"
28    driver-class-name = "com.mysql.jdbc.Driver"
29    url = "jdbc:mysql://127.0.0.1:3306/seata"
30    user = "root"
31    password = "******"
32  }
33}

启动 Seata

1bin\seata-server.bat -p 8091 -h 127.0.0.1 -m db

编写微服务模块

创建数据库表

创建 seata_order,seata_account,seata_storage 三个数据库,创建对应数据表,以及 undo_log 表:数据库脚本地址

微服务配置

将 file.conf 和 registry.conf 复制到微服务对应配置文件目录下。

在 application.yaml 指明自定义的服务组。

1spring:
2  cloud:
3    alibaba:
4      seata:
5        tx-service-group: jzh

导入 seata 依赖(指定好自己用的 seata 版本):

 1<dependency>
 2    <groupId>com.alibaba.cloud</groupId>
 3    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
 4    <exclusions>
 5        <exclusion>
 6            <artifactId>seata-all</artifactId>
 7            <groupId>io.seata</groupId>
 8        </exclusion>
 9    </exclusions>
10</dependency>
11<dependency>
12    <groupId>io.seata</groupId>
13    <artifactId>seata-all</artifactId>
14    <version>1.0.0</version>
15</dependency>

多数据源配置

seata 需要处理多个数据源,因此必须配置多数据源,然而 DataSourceAutoConfiguration.class 默认会帮我们自动配置单数据源,所以必须排除它。

启动类添加如下注解即可:

1@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)

同时指定多数据源配置如下:

 1@Configuration
 2public class DataSourceProxyConfig {
 3    @Bean
 4    @ConfigurationProperties(prefix = "spring.datasource")
 5    public DataSource druidDataSource() {
 6        return new DruidDataSource();
 7    }
 8
 9    @Bean
10    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
11        return new DataSourceProxy(dataSource);
12    }
13
14    @Bean
15    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
16        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
17        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
18        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
19        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
20        return sqlSessionFactoryBean.getObject();
21    }
22}

微服务代码编写

订单服务具体实现:

 1@Service
 2@Slf4j
 3public class OrderServiceImpl implements OrderService {
 4    @Override
 5    @GlobalTransactional(name = "jzh-create", rollbackFor = Exception.class)
 6    public void create(Order order) {
 7        log.info("************开始创建订单");
 8        orderDao.create(order);
 9
10        log.info("************开始扣库存");
11        storageService.decrease(order.getProductId(), order.getCount());
12        log.info("************完成扣库存");
13
14        log.info("************开始扣钱");
15        accountService.decrease(order.getUserId(), order.getMoney());
16        log.info("************扣钱完成");
17
18        orderDao.update(order.getUserId(), 0);
19        log.info("************订单完成");
20    }
21
22    @Resource
23    private OrderDao orderDao;
24    @Resource
25    private StorageService storageService;
26    @Resource
27    private AccountService accountService;
28}

rollbackFor 指定为任何异常发生都回滚。

其他两个微服务通过 feign 调用:

 1// AccountService.java
 2
 3@FeignClient(value = "seata-account-service")
 4public interface AccountService {
 5    @PostMapping(value = "/account/decrease")
 6    CommonResult decrease(@RequestParam("userId") Long userId,
 7                          @RequestParam("money") BigDecimal money);
 8}
 9
10// StorageService.java
11@FeignClient(value = "seata-storage-service")
12public interface StorageService {
13    @PostMapping(value = "/storage/decrease")
14    CommonResult decrease(@RequestParam("productId") Long productId,
15                          @RequestParam("count") Integer count);
16}

通过 /order/create 接口请求服务:

 1// OrderController.java
 2
 3@RestController
 4@Slf4j
 5public class OrderController {
 6    @Resource
 7    private OrderService orderService;
 8
 9    @PostMapping("/order/create")
10    public CommonResult create(@RequestBody Order order) {
11        log.info(order.toString());
12        orderService.create(order);
13        return new CommonResult(200, "订单创建成功");
14    }
15}

测试

  1. 访问 localhost:8848/nacos,发现三个微服务和 seata 服务都已经注册成功;
  2. 发送 localhost:2001/order/create 请求(POST,请求体为订单信息),返回 200,查看数据库表,数据变更正确;
  3. 手动给 Account 服务添加异常,重复如上请求,返回 500,后台报异常,查看库存数据库表,发现存量没有变化,证明回滚成功。