rocketmq-分布式事务解决方案

1. 分布式事务简介

X/Open Distributed Transaction Processing Reference Model 是X/Open这个组织定义的一套分布式事务的标准,也就是定义了规范和API接口,由各个厂商进行具体的实现。这个标准提出了使用二阶段提交2PC (Two-Phase-Commit)来保证分布式事务的完整性。后来J2EE也遵循了X/OpenDTP规范,设计并实现了java里的分布式事务编程接口规范JTA(Java Transaction API)

分布式事务包括事务管理器(Transaction Manager)和一个或多个支持 XA 协议的资源管理器 ( Resource Manager )。

1.1 分布式事务中的角色

在X/OpenDTP事务模型中,定义了三个角色:

  1. AP:application,应用程序,也就是业务层。哪些操作属于一个事务,就是AP定义的
  2. RM: Resource Manager,资源管理器。一般是数据库,也可以是其他资源管理器,比如消息队列, 文件系统
  3. TM: Transaction Manager ,事务管理器、事务协调者,负责接收来自AP发起的XA事务指令,并调度和协调参与事务的所有RM,确保事务正确完成

为什么需要TM这个角色?

在分布式系统中,每一个机器节点虽然都能够明确知道自己在进行事务操作过程中的结果是成功还是失败,但却无法直接获取到其他分布式节点的操作结果。因此当一个事务操作需要跨越多个分布式节点的 时候,为了保持事务处理的ACID特性,就需要引入一个”协调者”来统一调度所有分布式节点的执行逻辑。TM负责调度AP的行为,并最终决定这些AP是否要把各自的事务真正进行提交到RM

1.2 XA协议

XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA接口是双向的系统接口,在事务管理器TM以及多个RM之间形成同心桥梁,XA不能自动提交

XA协议包括两套函数,以xa_开头的及以ax_开头的,这些函数使事务管理器可以对资源管理器进行的操作:

1)xa_open,xa_close:建立和关闭与资源管理器的连接。

2)xa_start,xa_end:开始和结束一个本地事务。

3)xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。

4)xa_recover:回滚一个已进行预提交的事务。

5)ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。

6)ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。

看到这里我们应该清楚一些概念:

X/Open 是一个组织,他提出了分布式事务的标准DTP,该标准采用2PC来保证分布式事务的一致性,标准定义了分布式事务中的各种角色(AP,RM,TM),以及他们之间的通信协议XA,该协议是基于2PC的。J2EE遵循DTP标准实现了java里的分布式事务编程接口规范JTA。

1.3 XA/2PC执行过程

XA协议使用了2PC,因此XA需要两阶段提交: prepare 和 commit.

  • 第一阶段为 准备(prepare)阶段。即所有的RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。

    例如数据库在此阶段会做两件事

    1. 记录事务日志redo、undo
    2. 返回给TM信息ok、error
  • 第二阶段为提交阶段(commit)。当TM确认所有参与者都ready后,向所有RM发送commit命令。 若某个RM返回no,则TM向所有RM发送rollback命令。

存在问题

如果第一阶段完成后TM宕机或网络出现故障,导致TM无法发送commit/Rollback指令,此时RM会一直阻塞发生死锁。

或者某一个RM在第一阶段一直没有回复TM,TM会一直阻塞等待。

这都是因为2PC没有timeout机制导致的问题

XA性能局限性

  1. 效率低下,准备阶段的成本持久,全局事务状态的成本持久,性能与本地事务相差10倍左右;
  2. 提交前,出现故障难以恢复和隔离问题。

2. 理论支持

2.1 CAP理论

CAP的含义:

  • C:Consistency 一致性 同一数据的多个副本是否实时相同。
  • A:Availability 可用性:一定时间内 & 系统返回一个明确的结果 则称为该系统可用。
  • P:Partition tolerance 分区容错性 将同一服务分布在多个系统中,从而保证某一个系统宕机, 仍然有其他系统提供相同的服务。

在分布式系统中,由于网络的不确定性,分区容错性是一个健壮的系统必须要保证的性能。一致性和可用性则需要根据具体的业务进行选择。CAP理论告诉我们,在分布式系统中,C、A、P三个条件中我们最多只能选择两个,P必选的前提下,要么选择AP,要么选择CP。

对于一个业务系统来说,可用性和分区容错性是必须要满足的两个条件,并且这两者是相辅相成的。业务系统之所以使用分布式系统,主要原因有两个:

  • 提升整体性能 当业务量猛增,单个服务器已经无法满足我们的业务需求的时候,就需要使用分布式系统,使用多个节点提供相同的功能,从而整体上提升系统的性能,这就是使用分布式系统的第一个原因。
  • 实现分区容错性 单一节点 或 多个节点处于相同的网络环境下,那么会存在一定的风险,万一该 机房断电、该地区发生自然灾害,那么业务系统就全面瘫痪了。为了防止这一问题,采用分布式系 统,将多个子系统分布在不同的地域、不同的机房中,从而保证系统高可用性。

这说明分区容错性是分布式系统的根本,如果分区容错性不能满足,那使用分布式系统将失去意义。

此外,可用性对业务系统也尤为重要。在大谈用户体验的今天,如果业务系统时常出现“系统异常”、响 应时间过长等情况,这使得用户对系统的好感度大打折扣,在互联网行业竞争激烈的今天,相同领域的 竞争者不甚枚举,系统的间歇性不可用会立马导致用户流向竞争对手。因此,我们只能通过牺牲一致性 来换取系统的可用性分区容错

2.2 BASE理论

CAP理论告诉我们一个悲惨但不得不接受的事实——我们只能在C、A、P中选择两个条件。而对于业务 系统而言,我们往往选择牺牲一致性来换取系统的可用性和分区容错性。不过这里要指出的是,所谓 的“牺牲一致性”并不是完全放弃数据一致性,而是牺牲强一致性换取弱一致性

  • BA:Basic Available 基本可用

    • 整个系统在某些不可抗力的情况下,仍然能够保证“可用性”,即一定时间内仍然能够返回一个明确的结果。只不过“基本可用”和“高可用”的区别是:
      • “一定时间”可以适当延长 当举行大促时,响应时间可以适当延长
      • 给部分用户返回一个降级页面,从而缓解服务器压 力。但要注意,返回降级页面仍然是返回明确结果。
  • S:Soft State:柔性状态 同一数据的不同副本的状态,可以不需要实时一致。

  • E:Eventual Consisstency:最终一致性 同一数据的不同副本的状态,可以不需要实时一致,但 一定要保证经过一定时间后仍然是一致的。

3. 分布式事务常见解决方案

3.1 TCC两阶段补偿方案

TCC,Try-Confirm-Cancel是一种在业务层实现分布式事务的方案,能够提供强一致性的保证。微服务间的同步调用, 比如转账场景,扣款与收款需要同时成功,这种有强一致性要求的场景可以使用TCC来完成。

TCC 是服务化的二阶段编程模型,采用的补偿机制,而XA事务采用的是回滚机制。

下面以下单流程为例介绍一下TCC的工作流程,订单服务需要生成订单记录,修改订单支付状态–>调用库存服务扣减库存–>调用积分服务增加会员积分–>调用仓储服务生成销售出库单。

Try阶段

完成所有业务检查(一致性),预留业务资源(准隔离性)

首先,上面订单服务先把自己的状态修改为:OrderStatus.UPDATING。

这是啥意思呢?也就是说,在 pay() 那个方法里,你别直接把订单状态修改为已支付啊!你先把订单状态修改为 UPDATING,也就是修改中的意思。

这个状态是个没有任何含义的这么一个状态,代表有人正在修改这个状态罢了。

然后呢,库存服务直接提供的那个 reduceStock() 接口里,也别直接扣减库存啊,你可以是冻结掉库存。

举个例子,本来你的库存数量是 100,你别直接 100 - 2 = 98,扣减这个库存!

你可以把可销售的库存:100 - 2 = 98,设置为 98 没问题,然后在一个单独的冻结库存的字段里,设置一个 2。也就是说,有 2 个库存是给冻结了。

积分服务的 addCredit() 接口也是同理,别直接给用户增加会员积分。你可以先在积分表里的一个预增加积分字段加入积分。

比如:用户积分原本是 1190,现在要增加 10 个积分,别直接 1190 + 10 = 1200 个积分啊!

你可以保持积分为 1190 不变,在一个预增加字段里,比如说 prepare_add_credit 字段,设置一个 10,表示有 10 个积分准备增加。

仓储服务的 saleDelivery() 接口也是同理啊,你可以先创建一个销售出库单,但是这个销售出库单的状态是“UNKNOWN”。

也就是说,刚刚创建这个销售出库单,此时还不确定它的状态是什么呢!

上面这套改造接口的过程,其实就是所谓的 TCC 分布式事务中的第一个 T 字母代表的阶段,也就是 Try 阶段。

Confirm阶段

确认执行业务操作,不做任何业务检查,只使用Try阶段预留的业务资源。

然后就分成两种情况了,第一种情况是比较理想的,那就是各个服务执行自己的那个 Try 操作,都执行成功。

这个时候,就需要依靠 TCC 分布式事务框架来推动后续的执行了。这里简单提一句,如果你要玩儿 TCC 分布式事务,必须引入一款 TCC 分布式事务框架,比如国内开源的 ByteTCC、Himly、TCC-transaction。

否则的话,感知各个阶段的执行情况以及推进执行下一个阶段的这些事情,不太可能自己手写实现,太复杂了。

如果你在各个服务里引入了一个 TCC 分布式事务的框架,订单服务里内嵌的那个 TCC 分布式事务框架可以感知到,各个服务的 Try 操作都成功了。

此时,TCC 分布式事务框架会控制进入 TCC 下一个阶段,第一个 C 阶段,也就是 Confirm 阶段。

为了实现这个阶段,你需要在各个服务里再加入一些代码。比如说,订单服务里,你可以加入一个 Confirm 的逻辑,就是正式把订单的状态设置为“已支付”了,大概是类似下面这样子:

1
2
3
4
5
public class OrderServiceConfirm {
public void pay(){
orderDao.updateStatus(OrderStatus.PAYED);
}
}

库存服务也是类似的,你可以有一个 InventoryServiceConfirm 类,里面提供一个 reduceStock() 接口的 Confirm 逻辑,这里就是将之前冻结库存字段的 2 个库存扣掉变为 0。

这样的话,可销售库存之前就已经变为 98 了,现在冻结的 2 个库存也没了,那就正式完成了库存的扣减。

积分服务也是类似的,可以在积分服务里提供一个 CreditServiceConfirm 类,里面有一个 addCredit() 接口的 Confirm 逻辑,就是将预增加字段的 10 个积分扣掉,然后加入实际的会员积分字段中,从 1190 变为 1120。

仓储服务也是类似,可以在仓储服务中提供一个 WmsServiceConfirm 类,提供一个 saleDelivery() 接口的 Confirm 逻辑,将销售出库单的状态正式修改为“已创建”,可以供仓储管理人员查看和使用,而不是停留在之前的中间状态“UNKNOWN”了。

好了,上面各种服务的 Confirm 的逻辑都实现好了,一旦订单服务里面的 TCC 分布式事务框架感知到各个服务的 Try 阶段都成功了以后,就会执行各个服务的 Confirm 逻辑。

订单服务内的 TCC 事务框架会负责跟其他各个服务内的 TCC 事务框架进行通信,依次调用各个服务的 Confirm 逻辑。然后,正式完成各个服务的所有业务逻辑的执行。

Cancel阶段

取消Try阶段预留的业务资源。Try阶段出现异常时,取消所有业务资源预留请求

那如果是异常的一种情况呢?

举个例子:在 Try 阶段,比如积分服务吧,它执行出错了,此时会怎么样?

那订单服务内的 TCC 事务框架是可以感知到的,然后它会决定对整个 TCC 分布式事务进行回滚。

也就是说,会执行各个服务的第二个 C 阶段,Cancel 阶段。同样,为了实现这个 Cancel 阶段,各个服务还得加一些代码。

首先订单服务,它得提供一个 OrderServiceCancel 的类,在里面有一个 pay() 接口的 Cancel 逻辑,就是可以将订单的状态设置为“CANCELED”,也就是这个订单的状态是已取消。

库存服务也是同理,可以提供 reduceStock() 的 Cancel 逻辑,就是将冻结库存扣减掉 2,加回到可销售库存里去,98 + 2 = 100。

积分服务也需要提供 addCredit() 接口的 Cancel 逻辑,将预增加积分字段的 10 个积分扣减掉。

仓储服务也需要提供一个 saleDelivery() 接口的 Cancel 逻辑,将销售出库单的状态修改为“CANCELED”设置为已取消。

然后这个时候,订单服务的 TCC 分布式事务框架只要感知到了任何一个服务的 Try 逻辑失败了,就会跟各个服务内的 TCC 分布式事务框架进行通信,然后调用各个服务的 Cancel 逻辑。

总结

总结一下,你要玩儿 TCC 分布式事务的话:首先需要选择某种 TCC 分布式事务框架,各个服务里就会有这个 TCC 分布式事务框架在运行。

然后你原本的一个接口,要改造为 3 个逻辑,Try-Confirm-Cancel:

  • 先是服务调用链路依次执行 Try 逻辑。
  • 如果都正常的话,TCC 分布式事务框架推进执行 Confirm 逻辑,完成整个事务。
  • 如果某个服务的 Try 逻辑有问题,TCC 分布式事务框架感知到之后就会推进执行各个服务的 Cancel 逻辑,撤销之前执行的各种操作。

这就是所谓的 TCC 分布式事务。TCC 分布式事务的核心思想,说白了,就是当遇到下面这些情况时:

  • 某个服务的数据库宕机了。
  • 某个服务自己挂了。
  • 那个服务的 Redis、Elasticsearch、MQ 等基础设施故障了。
  • 某些资源不足了,比如说库存不够这些。

先来 Try 一下,不要把业务逻辑完成,先试试看,看各个服务能不能基本正常运转,能不能先冻结我需要的资源。

如果 Try 都 OK,也就是说,底层的数据库、Redis、Elasticsearch、MQ 都是可以写入数据的,并且你保留好了需要使用的一些资源(比如冻结了一部分库存)。

接着,再执行各个服务的 Confirm 逻辑,基本上 Confirm 就可以很大概率保证一个分布式事务的完成了。

那如果 Try 阶段某个服务就失败了,比如说底层的数据库挂了,或者 Redis 挂了,等等。

此时就自动执行各个服务的 Cancel 逻辑,把之前的 Try 逻辑都回滚,所有服务都不要执行任何设计的业务逻辑。保证大家要么一起成功,要么一起失败。

如果有一些意外的情况发生了,比如说订单服务突然挂了,然后再次重启,TCC 分布式事务框架是如何保证之前没执行完的分布式事务继续执行的呢?

所以,TCC 事务框架都是要记录一些分布式事务的活动日志的,可以在磁盘上的日志文件里记录,也可以在数据库里记录。保存下来分布式事务运行的各个阶段和状态。

万一某个服务的 Cancel 或者 Confirm 逻辑执行一直失败怎么办呢?

TCC 事务框架会通过活动日志记录各个服务的状态。举个例子,比如发现某个服务的 Cancel 或者 Confirm 一直没成功,会不停的重试调用它的 Cancel 或者 Confirm 逻辑,务必要它成功!

3.2 最大努力通知方案

最大努力通知方案主要也是借助MQ消息系统来进行事务控制,这一点与可靠消息最终一致方案一样。它是比较简单的分布式事务方案,它本质上就是通过定期校对,实现数据一致性。

一、最大努力通知方案的实现

  1. 业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
  2. 主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。
  3. 主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息。
  4. 业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。
  5. 如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。

二、最大努力通知方案的特点

  1. 用到的服务模式:可查询操作、幂等操作。
  2. 被动方的处理结果不影响主动方的处理结果;
  3. 适用于对业务最终一致性的时间敏感度低的系统;
  4. 适合跨企业的系统间的操作,或者企业内部比较独立的系统间的操作,比如银行通知、商户通知等

举例说明:短信服务场景

在登录、注册、验证场景,业务节点需要通过短信供应商发送验证短信给用户,用户通过短信验证码进行登录。

在这个场景中,短信服务供应商是业务主动方,服务端业务是被动方,需要等待主动方的通知

事务体现:服务端的登录和请求短信服务这两个操应该是一组

再一个例子就是:支付服务场景

支付宝、微信为支付服务的主动方

接入支付服务的应用为被动方

3.3 基于事务消息的最终一致方案

RocketMQ的TransactionProducer(事务消息)

RocketMQ和其他消息中间件最大的一个区别是支持了事务消息,这也是分布式事务里面的基于消息的最终一致性方案

3.3.1 RocketMQ消息的事务架构设计
  1. 生产者执行本地事务,修改订单支付状态,并且提交事务
  2. 生产者发送事务消息到broker上,消息发送到broker上在没有确认之前,消息对于consumer是不可见状态
  3. 生产者确认事务消息,使得发送到broker上的事务消息对于消费者可见
  4. 消费者获取到消息进行消费,消费完之后执行ack进行确认
  5. 这里可能会存在一个问题,生产者本地事务成功后,发送事务确认消息到broker上失败了怎么办?这个时候意味着消费者无法正常消费到这个消息。所以RocketMQ提供了消息回查机制,如果事务消息一直处于中间状态,broker会发起重试去查询broker上这个事务的处理状态。一旦发现 事务处理成功,则把当前这条消息设置为可见

注意:本地事务提交之后就不能再回滚了,保证最终一致性的分布式事务中,当下游业务出现异常,要做的不是去回滚上游的业务,而是重试出现异常的业务,直到成功为止,保证业务的最终一致性。

3.3.2 事务消息的实践

通过一个下单以后扣减库存的数据一致性场景来演示RocketMQ的分布式事务特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
TransactionMQProducer transactionProducer = new TransactionMQProducer("tx_producer_group");
transactionProducer.setNamesrvAddr("10.0.12.76:9876");
ExecutorService executorService = Executors.newFixedThreadPool(8);
transactionProducer.setExecutorService(executorService);
transactionProducer.setTransactionListener(new TransactionListenerLocal());
transactionProducer.start();
for(int i=0;i<20;i++) {
String orderId= UUID.randomUUID().toString();
String body="{'operation':'doOrder','orderId':'"+orderId+"'}";
Message message = new Message("pay_tx_topic", "TagA",orderId,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
transactionProducer.sendMessageInTransaction(message,
orderId+"&"+i);
Thread.sleep(1000);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TransactionListenerLocal implements TransactionListener {
private static final Map<String,Boolean> results=new ConcurrentHashMap<>();
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(":执行本地事务:"+arg.toString()); String orderId=arg.toString();
boolean rs = saveOrder(orderId);//模拟数据入库操作
return rs ? LocalTransactionState.COMMIT_MESSAGE:LocalTransactionState.UNKNOW;
// 这个返回状态表示告诉broker这个事务消息是否被确认,允许给到consumer进行消费
// LocalTransactionState.ROLLBACK_MESSAGE 回滚
// LocalTransactionState.UNKNOW 未知
}

private boolean saveOrder(String orderId) {
//如果订单取模等于0,表示成功,否则表示失败
boolean success=Math.abs(Objects.hash(orderId))%2==0;
results.put(orderId,success);
return success;
}

/**
* 提供事务执行状态的回查方法,提供给broker回调
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId=msg.getKeys();
System.out.println("执行事务执行状态的回查,orderId:"+orderId);
boolean rs=Boolean.TRUE.equals(results.get(orderId));
System.out.println("回调:"+rs);
return rs?LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.ROLLBACK_MESSAGE;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException, IOException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tx_consumer_group");
defaultMQPushConsumer.setNamesrvAddr("10.0.12.76:9876");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.subscribe("pay_tx_topic","*");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
msgs.stream().forEach(messageExt -> {
try {
String orderId=messageExt.getKeys();
String body=new String(messageExt.getBody(),
RemotingHelper.DEFAULT_CHARSET);
System.out.println("收到消息:"+body+",开始扣减库存:"+orderId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} });
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
System.in.read();
}
}
3.3.3 RocketMQ事务消息的三种状态
  1. ROLLBACK_MESSAGE:回滚事务
  2. COMMIT_MESSAGE:提交事务
  3. UNKNOW: broker会定时的回查Producer消息状态,直到彻底成功或失败。

executeLocalTransaction方法返回ROLLBACK_MESSAGE时,表示直接回滚事务

当返回 COMMIT_MESSAGE提交事务

当返回UNKNOW时,Broker会在一段时间之后回查checkLocalTransaction,根据 checkLocalTransaction返回状态执行事务的操作(回滚或提交)。

如示例中,当返回ROLLBACK_MESSAGE时消费者不会收到消息,且不会调用回查函数,当返回 COMMIT_MESSAGE时事务提交,消费者收到消息,当返回UNKNOW时,在一段时间之后调用回查函 数,并根据status判断返回提交或回滚状态,返回提交状态的消息将会被消费者消费,所以此时消费者 可以消费部分消息