1. 通过rocketmq实现分布式事务
1.1. 实现原理
1.1.1. 通常说的分布式事务
一次请求-调用serviceA
serviceA 中会调用serviceB,serviceC,serviceD
在serviceA ,serviceB,serviceC,serviceD中的事务要么一起成功
只要有一个节点事务失败了, 其他的节点都会将事务回滚
1.1.2. rocketmq的分布式事务
rocketmq的分布式事务与【通常说的分布式事务】有所不同
它会做最大努力(重试)让所有节点一起成功, 不保证一定成功
对于不成功的情况 应设置人工兜底补偿
如:
在美团外卖下单了,并不能一定收到外卖
可能鱼没有了 或者送货人手不够(不保证一定成功)
这时商家会联系顾客换个菜 或退款
1.2. 环境
rocketmq安装在ubuntu的docker中
在win10下的idea中启动项目,访问mq
- jdk- 1.8
- spring boot- 2.1.4
- rocketmq- 4.4.0
1.3. 流程描述
访问web 127.0.0.1:8080/payed
表示某个订单完成支付
发送一个订单支付成功的事务消息
此时先在本地执行扣钱操作
mq在扣钱完后,将消息被发给其他的服务节点
节点PointsConsumer 节点RepoConsumer在收到消息后进行相应的事务操作
实际过程中消息可能会被重复消费 需开发人员注意幂等处理
1.4. rocketmq安装
1
2
3
4
5
6
7
8
9
10
11
|
#端口情况 nameServer:9876, broker:{10911,10909}
docker run -d --name mqnamesrv \
rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
docker run -d --name mqbroker \
--link mqnamesrv -e "NAMESRV_ADDR=mqnamesrv:9876" \
rocketmqinc/rocketmq:4.4.0 sh mqbroker
#获得nameserver的ip
docker inspect -f {{.NetworkSettings.IPAddress}} mqnamesrv
172.17.0.3
|
为访问到虚拟机中docker中的mq
在windows中 添加路由
route -p add 172.17.0.0 mask 255.255.0.0 192.168.88.3
然后win10下, ping下172.17.0.3看是否连通
2. 代码篇
2.1. spring boot + mq生产者
关于mq生产者的3个主要配置
LocalOrderConsumer MqConfig ProduceConfig
2.1.1. pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluffy</groupId>
<artifactId>de-rocketmq-transaction</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>de-rocketmq-transaction</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
</project>
|
2.1.2. application.properties
1
2
3
4
5
6
7
|
server.port=8080
spring.application.name=mq-tran
mq.groupName= ${spring.application.name}
mq.namesrvAddr= 172.17.0.3:9876
topic.orderPaySuccess=order2PaySuccess
|
2.1.3. App_Main.java
1
2
3
4
5
6
|
@SpringBootApplication
public class App_Main {
public static void main(String[] args) {
SpringApplication.run(App_Main.class, args);
}
}
|
2.1.4. OrderController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@RestController
@Log4j2
public class OrderController {
@Value("${topic.orderPaySuccess}")
String orderTopic;
@Autowired
private TransactionMQProducer tsaProducer;
static int num = 0;
@RequestMapping("payed")
public String payed() throws Exception {
// 模拟订单数据
String orderNo = "od-" + num++;
// 处理消息时可能会用到
String param = "productId=1234&buyerId=101";
Message msg = new Message(orderTopic, "tag1", orderNo, param.getBytes());
tsaProducer.setSendMsgTimeout(2000);
SendStatus status = tsaProducer
.sendMessageInTransaction(msg, null)
.getSendStatus();
return status.name();
}
@RequestMapping("hello")
public String hello() {
return "hello-order";
}
}
|
2.1.5. LocalOrderConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 该服务执行成功则通知其他订阅者去消费
*/
public class LocalOrderConsumer implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
// 通常db是去重表 保证处理幂等
private ConcurrentHashMap<String, Integer> db = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
System.out.println("本地事务-处理订单-" + msg.getKeys());
if (1 == status) {// 有三分之一机会付钱成功
System.out.println("本地事务-订单" + msg.getKeys() + "状态更新为成功");
System.out.println("本地事务-扣除冻结款-更新账户的余额");
} else if (2 == status) {
System.out.println("本地事务-库存不足,订单已取消");
}
db.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
// 当长时间没有得到是否处理的响应(成功或失败)
// 比如扣钱完毕 提交后 系统突然挂掉了,还没来得及通知mq
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 此时检查
Integer status = db.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1: // 支付成功, 消息
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
|
2.1.6. MqConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@ConfigurationProperties(prefix = "mq")
@Configuration
public class MqConfig {
String namesrvAddr;
String groupName;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
}
|
2.1.7. ProduceConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Log4j2
@Configuration
public class ProduceConfig {
@Autowired
MqConfig mqConfig;
@Bean("tsaProducer")
public TransactionMQProducer transactionMQProducer() {
String producerGroup = mqConfig.getGroupName() + "-tsa-consumer";
log.info("TransactionMQProducer 正在创建");
try {
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
ExecutorService executorService = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1 << 8), (Runnable r) -> {
Thread thread = new Thread(r);
thread.setName("mq-transaction-producer");
return thread;
});
producer.setNamesrvAddr(mqConfig.getNamesrvAddr());
producer.setExecutorService(executorService);
producer.setTransactionListener(new LocalOrderConsumer());
producer.start();
log.info("TransactionMQProducer 创建成功");
return producer;
} catch (Exception e) {
log.error("TransactionMQProducer 创建失败",e);
System.exit(-1);
}
return null;
}
}
|
2.2. 消费者
消费者不一定与生产者在同一个节点
这里时demo就放在同一项目了
2.2.1. AbstractConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import fluffy.mo.transactionMQ.MqConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@Log4j2
public abstract class AbstractConsumer {
@Autowired
MqConfig config;
String suffix;
public void setSuffix(String suffix) {
this.suffix = suffix;
}
// 开启消费者监听服务
public void listener(String topic, String tag) {
String producerGroup = config.getGroupName() +"-"+ suffix;
log.info("加载mq-" + topic + ":" + tag + "消费者");
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
consumer.setNamesrvAddr(config.getNamesrvAddr());
consumer.subscribe(topic, tag);
consumer.setConsumeMessageBatchMaxSize(1);//每次处理1条
consumer.setMaxReconsumeTimes(3);//消息最多重试3次
consumer.setConsumeThreadMin(4);
consumer.setConsumeThreadMax(1<<4);//并发处理的线程数
consumer.setMessageModel(MessageModel.CLUSTERING);
// 消费消息
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context)
-> AbstractConsumer.this.handle(msgs));
consumer.start();
log.info("消费者mq连接成功");
} catch (Exception e) {
log.error("消费者mq连接失败", e);
System.exit(-1);
}
}
// 处理body的业务
public abstract ConsumeConcurrentlyStatus handle(List<MessageExt> msgs);
}
|
2.2.2. RepoConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Configuration
public class RepoConsumer extends AbstractConsumer implements ApplicationListener<ContextRefreshedEvent> {
// @Autowired
// RepoService repoService;
@Override
public ConsumeConcurrentlyStatus handle(List<MessageExt> msgs) {
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
try {
String orderNo = messageExt.getKeys();
System.out.println("远程服务2-开始拣货-" + orderNo);
// repoService...拣货操作
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// log.error("", e);
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
@Value("${topic.orderPaySuccess}")
String orderTopic;
AtomicBoolean connected = new AtomicBoolean(false);
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
if(connected.compareAndSet(false, true)){
super.setSuffix("repo");
super.listener(orderTopic, "tag1");
}
}
}
|
2.2.3. PointsConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Configuration
public class PointsConsumer extends AbstractConsumer implements ApplicationListener<ContextRefreshedEvent> {
// @Autowired
// PointService pointService;
@Override
public ConsumeConcurrentlyStatus handle(List<MessageExt> msgs) {
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
try {
String orderNo = messageExt.getKeys();
System.out.println("远程服务1-增加积分-" + orderNo);
// pointService...增加积分
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// log.error("", e);
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
@Value("${topic.orderPaySuccess}")
String orderTopic;
AtomicBoolean connected = new AtomicBoolean(false);
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
if(connected.compareAndSet(false, true)){
super.setSuffix("point");
super.listener(orderTopic, "tag1");
}
}
}
|
3. 验证
访问5次页面
127.0.0.1:8080/payed
控制台可看到序号 1,4会成功提交
稍等一会可看到
两个消费者 消费了订单1,4
生产环境下 应注意消息消费的幂等处理
文章作者
duansheli
上次更新
2019-12-25
(325c7b3)