背景
- 从支付宝转账1万块钱到余额宝,支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了。
- 在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的商品数量必须减1,怎么保证?
- 在广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费,怎么保证?
这些问题本质上都可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功。
思考
- 如果系统规模较小,数据表都在一个数据库实例上,本地事务方式可以很好地运行。
- 但是如果系统规模较大,比如支付宝账户表和余额宝账户表显然不会在同一个数据库实例上,他们往往分布在不同的物理节点上,这时本地事务已经失去用武之地。
分布式事务
两阶段提交协议
两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器TC和若干事务执行者SI两种角色,这里的事务执行者就是具体的数据库,协调器可以和事务执行器在一台机器上。
TC或Si为什么要把发送或接收到的消息先写到本地日志里?
- 主要是为了故障后恢复用。
- 如某一Si从故障中恢复过来,先检查本机的日志,如果已收到commit则提交,如果abort则回滚。如果是yes,则再向TC询问一下,确定下一步。
- 如果什么都没有,则很可能在prepare阶段Si就崩溃了,因此需要回滚。
怎么实现?
- 手工代码实现(复杂度高)
- 采用第三方框架,例如atomikos
两阶段提交的性能问题
- 实践表明二阶段提交的性能实在是太差,根本不适合高并发的系统。主要由两方面原因导致:
- 两阶段提交涉及多次节点间的网络通信,导致通信时间太长。
- 事务时间相对变长很多,对锁定的资源的时间也变长了,造成资源等待时间也增加很多。
正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。
消息队列
在生活中我们常见一些场景,以在九方商城的喜茶店买饮料为例来说明。
告诉服务员你想买的饮料的名字,拿出手机付完款,对方并不会直接把饮料给你,而是给你一张小票,然后让你拿着小票到出货区排队去取。
- 为什么他们要将付钱和取货两个动作分开呢?
- 只要这张小票在,你最终是能拿到饮料的。
- 同理转账服务也是如此,当支付宝账户扣除1万后,我们只要生成一个凭证(消息)即可,这个凭证(消息)上写着“让余额宝账户增加 1万”,只要这个凭证(消息)能可靠保存,我们最终是可以拿着这个凭证(消息)让余额宝账户增加1万的,即我们能依靠这个凭证(消息)完成最终一致性。
如何可靠保存凭证(消息)
业务与消息耦合的方式(依赖本地事务)
支付宝在完成扣款的同时,同时记录消息数据,这个消息数据与业务数据保存在同一数据库实例里(消息记录表表名为message);
1 | Begin transaction |
- 上述事务能保证只要支付宝账户里被扣了钱,消息一定能保存下来。
- 当上述事务提交成功后,我们通过消息队列服务将此消息通知余额宝,余额宝处理成功后发送回复成功消息,支付宝收到回复后删除该条消息数据。
业务与消息解耦方式
第一种方式在保存消息的时候,使得消息数据和业务数据紧耦合在一起,从架构上看不够优雅,而且容易诱发其他问题。为了解耦可以采用以下方式。
- 支付宝在扣款事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送。当消息发送成功(发出去)后才会提交事务;
- 当支付宝扣款事务被提交成功后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才真正发送该消息;
- 当支付宝扣款事务提交失败回滚后,向实时消息服务取消发送。在得到取消发送指令后,该消息将不会被发送;
- 对于那些未确认的消息或者取消的消息,需要有一个消息状态确认系统定时去支付宝系统查询这个消息的状态并进行更新。
- 优点:消息数据独立存储,降低业务系统与消息系统间的耦合;
- 缺点:一次消息发送需要两次请求;业务处理服务需要实现消息状态回查接口。
目前支持事务消息的消息队列只有淘宝开源的RocketMQ
如何解决消息重复投递的问题
以我们支付宝转账到余额宝为例,如果相同的消息被重复投递两次,那么我们余额宝账户将会增加2万而不是1万了。
- 为什么相同的消息会被重复投递?
比如余额宝处理完消息msg后,发送了处理成功的消息给支付宝,正常情况下支付宝应该要删除消息msg,但如果支付宝这时候悲剧的挂了,重启后一看消息msg还在,就会继续发送消息msg。 - 解决方法
在余额宝这边增加消息应用状态表(message_apply),通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务)。
如何解决消息丢失导致业务不完整的问题
- 下一个节点系统在成功处理完消息之后可以主动通知本节点
- 本节点可以主动查询没有完成确认的事务消息,8次间隔查询策略:
4m
10m
10m
1h
2h
6h
15h
24h
Paxos算法
Paxos算法是莱斯利·兰伯特(Leslie Lamport,就是 LaTeX 中的”La”,此人现在在微软研究院)于1990年提出的一种基于消息传递的一致性算法。由于算法难以理解起初并没有引起人们的重视,使Lamport在八年后1998年重新发表到ACM Transactions on Computer Systems上(The Part-Time Parliament)。即便如此paxos算法还是没有得到重视,2001年Lamport 觉得同行无法接受他的幽默感,于是用容易接受的方法重新表述了一遍(Paxos Made Simple)。可见Lamport对Paxos算法情有独钟。近几年Paxos算法的普遍使用也证明它在分布式一致性算法中的重要地位。2006年Google的三篇论文初现“云”的端倪,其中的Chubby Lock服务使用Paxos作为Chubby Cell中的一致性算法,Paxos的人气从此一路狂飙。(Lamport 本人在 他的blog 中描写了他用9年时间发表这个算法的前前后后)
简单说来,Paxos的目的是让整个集群的结点对某个值的变更达成一致。Paxos算法基本上来说是个民主选举的算法——大多数的决定会成个整个集群的统一决定。任何一个点都可以提出要修改某个数据的提案,是否通过这个提案取决于这个集群中是否有超过半数的结点同意(所以Paxos算法需要集群中的结点是单数)。
基于本地事务+消息队列的解决方案示例
对于我们来讲比较可行的方案是采用本地事务+消息队列的方式解决分布式事务问题,在下面我将对方案作简单的介绍,演示的是如何在两个独立的系统(A和B)间实际转账业务。
数据库表结构设计
- 在实际设计时要依照具体的业务来,在这里由于是演示转账,所以需要一张账户名记录用户的余额等信息。
1
2
3
4
5
6CREATE TABLE `account` (
`id` int(10) unsigned NOT NULL COMMENT '编号',
`name` varchar(45) COLLATE utf8_bin DEFAULT NULL COMMENT '用户标识',
`amount` int(11) DEFAULT NULL COMMENT '余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin - 事务消息表,记录转账这个业务操作,相当于一张凭证。另一个系统拿到此凭证后可以进行后续处理,以最终完成转账业务。前面三张字段是固定的,后面的字段依照具体的业务来设计。建议使用
ts_msg_
作为事务消息表的前缀,以区分不同的表类型。1
2
3
4
5
6
7
8
9
10CREATE TABLE `ts_msg_transfer` (
`tsId` varchar(24) COLLATE utf8_bin NOT NULL COMMENT '事务编号',
`tsCreateAt` datetime NOT NULL COMMENT '事务创建时间',
`tsState` tinyint(4) NOT NULL COMMENT '事务状态:0进行中、1完成',
`transactionAt` datetime DEFAULT NULL COMMENT '交易时间',
`userId` varchar(45) COLLATE utf8_bin DEFAULT NULL COMMENT '转账接收用户编号',
`balance` int(11) DEFAULT NULL COMMENT '转账金额',
PRIMARY KEY (`tsId`),
KEY `index_tsState` (`tsState`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='转账业务表' - 统这边需要有一张记录事务处理结果的表,用来防止重复处理事务消息。所有事务消息的确认表用同一张即可,不用按照业务进行区分。考虑到后续此表规模增加较快,可以提前做好水平分表的设计。
1
2
3
4
5CREATE TABLE `ts_msg_apply` (
`tsId` varchar(24) COLLATE utf8_bin NOT NULL COMMENT '事务编号',
`tsDoneAt` datetime DEFAULT NULL COMMENT '事务消息处理完成时间',
PRIMARY KEY (`tsId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='事务消息成功处理完后插入到此表,防止本条消息被业务重复处理。'代码示例
在内部分享时面向php程序员,所以下面以php代码作为示例,其他语言开发者只要理解示例的意图就可以了,相信转换成其它语言并不是什么难事。
- A这边通过本地事务保证所有操作要么全部成功,要么全部失败,以保证业务一致性。可以看到从账户扣减金额和记录事务以及发送消息队列的操作全部在同一个本地事务内,任何一个步骤出错都将被捕捉到并进行回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public static function messageTransfer(TransferForm $form): bool
{
$db = Yii::$app->getDb();
$ts = $db->beginTransaction();
try {
$fromName = '张三';
$toName = '李四';
if ($form->type != 0) {
$fromName = '李四';
$toName = '张三';
}
//第一件事:从要转账用户的账户上扣减转账金额
AccountDAO::debit($fromName, $form->amount);
//第二件事:此时往被转账用户的账户上加款是异步的,并且这个业务的处理不在本应用内。
//第1件小事:记录本地事务
$tsMsg = new TsMsgTransfer();
$tsMsg->transactionAt = date('Y-m-d H:i:s');
$tsMsg->userId = $toName;
$tsMsg->balance = $form->amount;
TsMsgTransferDAO::record($tsMsg);
//第2件小事:发送消息到队列
TencentQueueUtil::sendMessage('queue-test-dev', json_encode($tsMsg));
$ts->commit();
} catch (\Exception $e) {
Yii::error('异步转账业务操作失败' . $e->getMessage(), __METHOD__);
$ts->rollBack();
return false;
}
return true;
} - B的处理是接收消息队列的数据,并在本地事务中进行处理,从而保证业务的一致性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public static function handleMessageTransfer()
{
//批量接受消息
$msgAry = TencentQueueUtil::batchReceiveMessage('queue-test-dev', 16);
//逐条处理每条消息
foreach ($msgAry as $msg) {
$msgObj = json_decode($msg->msgBody);
$db = Yii::$app->getDb();
$ts = $db->beginTransaction();
try {
$tsMsgApply = new TsMsgApply();
$tsMsgApply->tsId = $msgObj->tsId;
//判断消息是否已经被处理过
if (TsMsgApplyDAO::isRepeated($tsMsgApply)) {
TencentQueueUtil::deleteMessage('queue-test-dev', $msg->receiptHandle);
continue;
}
//处理用户加款业务
AccountDAO::borrow($msgObj->userId, $msgObj->balance);
//记录这件事情已经做了
TsMsgApplyDAO::done($tsMsgApply);
//删除队列中的消息,防止后续重复处理。
TencentQueueUtil::deleteMessage('queue-test-dev', $msg->receiptHandle);
$ts->commit();
} catch (\Exception $e) {
Yii::error($e->getMessage(), __METHOD__);
$ts->rollBack();
}
//同步通知上一节点系统,我成功处理这条消息了。
Factory::getRpcInterface->confirm($tsId);
}
}总结
上面进行了表结构设计和伪码示例,为了方便更好的理解,下面作一个总结。
- A这边处理时,虽然把所有业务操作都放到了一个本地事务中,但仍然有一个潜在的问题,那就是消息队列发送成功,但是最终提交本地事务的时候,数据库不能服务了,导致提交失败,但此时消息队列中已经有消息了。要解决这个问题也比较简单,在B系统这边对收到的每条消息在处理时通过RPC方式,拿着事务编号向A系统进行询问,如果存在此事物则进行处理,没有的话就抛弃这条消息。
- 息队列中的消息如果丢失了怎么办?在系统A这边应该有相应的检查策略,发现有事务消息长时间没有被处理,此时应该再次将它发送到消息队列,所以必须有这么一个补偿机制。
- 如果消息存在重新投递,那系统B如何应付重复的消息?我们在设计表结构时,有一张防重表
ts_msg_apply
,它的重要作用在于B系统成功处理完一条消息,就要记录到防重表里面。这样下次仍然收到此事务消息时,可以先在防重表中作判断。如果是存在的则不进行处理。 - 何防止接收到重复消息?在B系统处理完消息之后,通过消息队列或RPC的方式,一定要告知系统A这条事务我处理完了,你那边也要进行事务状态变更,或者直接删除此事务。如此这条事务就得到了最终确认。另外一个可能在系统B处理完业务,但删除消息队列数据时失败了,导致整个业务操作被回滚。只要没有被消息队列删除,这条消息仍然是可见的,它下次还会被消费到。