1. 通过rocketmq实现分布式事务

1.1. 实现原理

1.1.1. 通常说的分布式事务

一次请求-调用serviceA
serviceA 中会调用serviceB,serviceC,serviceD
在serviceA ,serviceB,serviceC,serviceD中的事务要么一起成功
只要有一个节点事务失败了, 其他的节点都会将事务回滚

1.1.2. rocketmq的分布式事务

rocketmq的分布式事务与【通常说的分布式事务】有所不同
它会做最大努力(重试)让所有节点一起成功, 不保证一定成功
对于不成功的情况 应设置人工兜底补偿

如:
在美团外卖下单了,并不能一定收到外卖
可能鱼没有了 或者送货人手不够(不保证一定成功)
这时商家会联系顾客换个菜 或退款

1.2. 环境

rocketmq安装在ubuntu的docker中
在win10下的idea中启动项目,访问mq

  • jdk- 1.8
  • spring boot- 2.1.4
  • rocketmq- 4.4.0

1.3. 流程描述

访问web  127.0.0.1:8080/payed  
表示某个订单完成支付  
发送一个订单支付成功的事务消息  
此时先在本地执行扣钱操作  
mq在扣钱完后,将消息被发给其他的服务节点  
节点PointsConsumer 节点RepoConsumer在收到消息后进行相应的事务操作  

实际过程中消息可能会被重复消费 需开发人员注意幂等处理

1.4. rocketmq安装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#端口情况 nameServer:9876, broker:{10911,10909}
docker run -d --name mqnamesrv \

rocketmqinc/rocketmq:4.4.0 sh mqnamesrv

docker run -d --name mqbroker \

--link mqnamesrv -e "NAMESRV_ADDR=mqnamesrv:9876" \

rocketmqinc/rocketmq:4.4.0 sh mqbroker 

#获得nameserver的ip
docker inspect -f {{.NetworkSettings.IPAddress}} mqnamesrv
172.17.0.3
为访问到虚拟机中docker中的mq  
在windows中 添加路由  
route -p add 172.17.0.0 mask 255.255.0.0 192.168.88.3  
然后win10下,  ping下172.17.0.3看是否连通  

2. 代码篇

2.1. spring boot + mq生产者

关于mq生产者的3个主要配置
LocalOrderConsumer MqConfig ProduceConfig

2.1.1. pom.xml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.fluffy</groupId>
    <artifactId>de-rocketmq-transaction</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>de-rocketmq-transaction</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
    </dependencies>
</project>

2.1.2. application.properties

1
2
3
4
5
6
7
server.port=8080
spring.application.name=mq-tran

mq.groupName= ${spring.application.name}
mq.namesrvAddr= 172.17.0.3:9876

topic.orderPaySuccess=order2PaySuccess

2.1.3. App_Main.java

1
2
3
4
5
6
@SpringBootApplication
public class App_Main {
    public static void main(String[] args) {
        SpringApplication.run(App_Main.class, args);
    }
}

2.1.4. OrderController.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RestController
@Log4j2
public class OrderController {
    @Value("${topic.orderPaySuccess}")
    String orderTopic;
    @Autowired
    private TransactionMQProducer tsaProducer;

    static int num = 0;
    @RequestMapping("payed")
    public String payed() throws Exception {
        // 模拟订单数据
        String orderNo = "od-" + num++;
        // 处理消息时可能会用到
        String param = "productId=1234&buyerId=101";

        Message msg = new Message(orderTopic, "tag1", orderNo, param.getBytes());
        tsaProducer.setSendMsgTimeout(2000);
        SendStatus status = tsaProducer
                .sendMessageInTransaction(msg, null)
                .getSendStatus();
        return status.name();
    }
    @RequestMapping("hello")
    public String hello() {
        return "hello-order";
    }
}

2.1.5. LocalOrderConsumer.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 该服务执行成功则通知其他订阅者去消费
 */
public class LocalOrderConsumer implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    // 通常db是去重表 保证处理幂等
    private ConcurrentHashMap<String, Integer> db = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        System.out.println("本地事务-处理订单-" + msg.getKeys());
        if (1 == status) {// 有三分之一机会付钱成功
            System.out.println("本地事务-订单" + msg.getKeys() + "状态更新为成功");
            System.out.println("本地事务-扣除冻结款-更新账户的余额");
        } else if (2 == status) {
            System.out.println("本地事务-库存不足,订单已取消");
        }
        db.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    // 当长时间没有得到是否处理的响应(成功或失败)
    // 比如扣钱完毕 提交后 系统突然挂掉了,还没来得及通知mq
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 此时检查
        Integer status = db.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1: // 支付成功, 消息
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

2.1.6. MqConfig.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@ConfigurationProperties(prefix = "mq")
@Configuration
public class MqConfig {
    String namesrvAddr;
    String groupName;

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }
}

2.1.7. ProduceConfig.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Log4j2
@Configuration
public class ProduceConfig {
    @Autowired
    MqConfig mqConfig;

    @Bean("tsaProducer")
    public TransactionMQProducer transactionMQProducer() {
        String producerGroup = mqConfig.getGroupName() + "-tsa-consumer";
        log.info("TransactionMQProducer 正在创建");
        try {
            TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
            ExecutorService executorService = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(1 << 8), (Runnable r) -> {
                Thread thread = new Thread(r);
                thread.setName("mq-transaction-producer");
                return thread;
            });
            producer.setNamesrvAddr(mqConfig.getNamesrvAddr());
            producer.setExecutorService(executorService);
            producer.setTransactionListener(new LocalOrderConsumer());
            producer.start();
            log.info("TransactionMQProducer 创建成功");
            return producer;
        } catch (Exception e) {
            log.error("TransactionMQProducer 创建失败",e);
            System.exit(-1);
        }
        return null;
    }
}

2.2. 消费者

消费者不一定与生产者在同一个节点
这里时demo就放在同一项目了

2.2.1. AbstractConsumer.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import fluffy.mo.transactionMQ.MqConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@Log4j2
public abstract class AbstractConsumer {

    @Autowired
    MqConfig config;
    String suffix;

    public void setSuffix(String suffix) {
        this.suffix = suffix;
    }

    // 开启消费者监听服务
    public void listener(String topic, String tag) {
        String producerGroup = config.getGroupName() +"-"+ suffix;
        log.info("加载mq-" + topic + ":" + tag + "消费者");
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
            consumer.setNamesrvAddr(config.getNamesrvAddr());
            consumer.subscribe(topic, tag);
            consumer.setConsumeMessageBatchMaxSize(1);//每次处理1条
            consumer.setMaxReconsumeTimes(3);//消息最多重试3次
            consumer.setConsumeThreadMin(4);
            consumer.setConsumeThreadMax(1<<4);//并发处理的线程数
            consumer.setMessageModel(MessageModel.CLUSTERING);
            // 消费消息
            consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context)
                    -> AbstractConsumer.this.handle(msgs));

            consumer.start();
            log.info("消费者mq连接成功");
        } catch (Exception e) {
            log.error("消费者mq连接失败", e);
            System.exit(-1);
        }
    }

    // 处理body的业务
    public abstract ConsumeConcurrentlyStatus handle(List<MessageExt> msgs);

}

2.2.2. RepoConsumer.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Configuration
public class RepoConsumer extends AbstractConsumer implements ApplicationListener<ContextRefreshedEvent> {
    //    @Autowired
//    RepoService repoService;
    @Override
    public ConsumeConcurrentlyStatus handle(List<MessageExt> msgs) {
        MessageExt messageExt = msgs.get(0);
        String keys = messageExt.getKeys();
        try {
            String orderNo = messageExt.getKeys();
            System.out.println("远程服务2-开始拣货-" + orderNo);
            // repoService...拣货操作
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // log.error("", e);
        }

        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    @Value("${topic.orderPaySuccess}")
    String orderTopic;
    AtomicBoolean connected = new AtomicBoolean(false);
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if(connected.compareAndSet(false, true)){
            super.setSuffix("repo");
            super.listener(orderTopic, "tag1");
        }
    }
}

2.2.3. PointsConsumer.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Configuration
public class PointsConsumer extends AbstractConsumer implements ApplicationListener<ContextRefreshedEvent> {
//    @Autowired
//    PointService pointService;

    @Override
    public ConsumeConcurrentlyStatus handle(List<MessageExt> msgs) {
        MessageExt messageExt = msgs.get(0);
        String keys = messageExt.getKeys();
        try {
            String orderNo = messageExt.getKeys();
            System.out.println("远程服务1-增加积分-" + orderNo);
            // pointService...增加积分
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // log.error("", e);
        }
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    @Value("${topic.orderPaySuccess}")
    String orderTopic;
    AtomicBoolean connected = new AtomicBoolean(false);

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if(connected.compareAndSet(false, true)){
            super.setSuffix("point");
            super.listener(orderTopic, "tag1");
        }
    }
}

3. 验证

访问5次页面
127.0.0.1:8080/payed
控制台可看到序号 1,4会成功提交
稍等一会可看到
两个消费者 消费了订单1,4

生产环境下 应注意消息消费的幂等处理