设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 重新 数据 创业者
当前位置: 首页 > 综合聚焦 > 编程要点 > 正文

SpringCloud+RocketMQ完成分布式事务的实践

发布时间:2021-11-19 11:15 所属栏目:13 来源:互联网
导读:一、RocketMQ的分布式事务结构和说明 我们通过下图来了解一下RocketMQ实现分布式事务的结构。采用半消息机制实现分布式事务,半消息顾名思义,就是发送方将消息发送到MQ中的Broker端,这个消息被标记为暂不投递状态,这个时间订阅方是不能收到这个消息的,当
一、RocketMQ的分布式事务结构和说明
  我们通过下图来了解一下RocketMQ实现分布式事务的结构。采用半消息机制实现分布式事务,半消息顾名思义,就是发送方将消息发送到MQ中的Broker端,这个消息被标记为“暂不投递”状态,这个时间订阅方是不能收到这个消息的,当发送方将提交了Commit后,这个消息才可以被订阅方收到。如果发送方提交了Rollback,那么订阅方就不会收到以该消息。
 
  RocketMQ采用的是最终一致性事务,因为它使用了半消息机制,订阅方可能收不到消息,但是最终状态是能保持一致的。
 
  我们在讲这个流程前,我们需要先看下图的红色部分,当发送方发给MQ的Commit/Rollback没有收到的时候,这个时候需要启用一个线程,从本地事务库里边查找到当前的状态,根据当前的状态来决定是否重新发送Commit/Rollback,相当于重试。
 
 
 
  接下来,我们说一下整个执行的过程。
 
发送方将半消息发送到MQ中,成功后MQ将消息保持好后将结果同步给发送方,即半消息发送成功。当MQ告诉发送方成功后,我们需要记录下发送到MQ的相关的事情,比如发送的消息内容,发送时间,发送的相关ID,比如事务ID,都要做好相应记录。当本地事务库,也可以理解为日志库,我们可以针对发生的问题进行追溯。本地库记录好事务后,发送方就可以将Commit/Rollback提交到MQ了。当MQ收到了Commit命令后,这个时候就会将该条消息发送给订阅方。收到了Rollback后,那该条消息不会投递给订阅方,消息保存3天后删除。
 
  整个过程就是这些,大家如果觉得有遗漏的,可以告诉我。
 
二、搭建RocketMQ
  我使用的是windos10,所以需要下载windows版本。
 
   进入到RocketMQ的官方地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/
 
   下载二进制文件压缩包。如下图:
 
 
 
  先解压,然后进行环境变量配置,打开菜单,直接输入环境变量。
 
    变量名:ROCKETMQ_HOME
 
    值:解压的release的路径,如D:\rocketmq-all-4.3.0-bin-release
 
  然后打开CMD命令,进入到解压路径中的bin目标,进行nameserver启动。
 
start mqnameserv.cmd
  启动后的效果为:
 
 
 
  再进行broker的启动,启动需要连到的nameserver为127.0.0.1:9876,打开自动创建Topic,这个时候当用到某个Topic没有的情况下会自动创建一个。
 
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
 
 
  这个时候Windows的rocketmq就下载好并且启动成功了。
 
三、事务场景,然后准备工程,运行代码
  因为我们使用的RocketMq是使用的分布式事务是最终一致性的,柔性的,所以我们使用的场景也要考虑到。我们用的场景就是下订单充话费,用户支付了订单,那么直接给用户的进行充值。购买的订单不会立马充值到手机的,需要等一会才到账,这就是最终一致性。
 
  我们使用的如下代码版本号,SpringCloud的版本号要和SpringBoot保持一致,否则会出现类找不到的情况。
 
SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +MySQL + lombok(插件)
 
  我们使用SpringCloud的几个组件:
 
  Euerka Server :用于提供服务注册的能力和发现
 
  Euerka Client : 用于进行服务注册
 
  Feign:用于服务间的调用
 
  Config :用于进行配置
 
  接下来,我们创建需要进行配置的表。
 
DROP TABLE IF EXISTS `spring_cloud_config`;
 
CREATE TABLE `spring_cloud_config` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `akey` varchar(30) DEFAULT NULL,
  `avalue` varchar(128) DEFAULT NULL,
  `application` varchar(30) DEFAULT NULL,
  `aprofile` varchar(30) DEFAULT NULL,
  `label` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 
 
INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`)
VALUES
    (2,'name_server','127.0.0.1:9876','product-service','dev','dev'),
    (3,'name_server','127.0.0.1:9876','order-service','dev','dev'),
    (4,'order_topic','order_topic','order-service','dev','dev'),
    (5,'order_topic','order_topic','product-service','dev','dev');
  我们构建了4个模块,eureka模块用于服务注册和发现,springcloud-config将数据库创建的配置加载供其他工程使用。charge-order-service用于下充值订单,phone-charge-service用于给手机进行充值业务。
 
 
 
  主要的流程为,通过charge-order-service产生了订单,然后将订单和数量到phone-charge-service进行二次确认看是否还有足够的数量可以使用,如果数量不够的话直接将事务进行rollback,这样消息就不会到消费端。如果下过去的订单号已经充值过了,那么该消息将会被直接丢掉,这也是消息端的幂等设计。
 
  接下来,我们看一下生产者以及事务的核心代码。主要用于连接RocketMQ, 执行本地事务,在本地事务中进行二次确认,根据结果进行Commit、Rollback。当连接RocketMQ出现问题的时候可能会收到UNKNOWN,这个时候会调用checkLocalTransaction()方法,用于检查是否将消息发送给RocketMQ了。
 
package com.hqs.chargeorderservice.mqservice;
 
import com.alibaba.fastjson.JSONObject;
import com.hqs.chargeorderservice.config.Jms;
import com.hqs.chargeorderservice.service.ProduceOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.concurrent.*;
 
 

(编辑:ASP站长网)

    网友评论
    推荐文章
      热点阅读