使用mq实现分布式事务-补偿事务一致性

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 使用mq实现分布式事务-补偿事务一致性

mq实现分布式事务-补偿事务一致性

-

-

-

严格的来说,消息中间件并不能实现分布式事务,而是通过事后补偿机制,达到和分布式事务一样的数据一致性。这里主要探讨Rocket mqRabbit mq的实现思路。Rocket mq只描述一下实现思路,Rabbit mq会有代码演示。还是那句话,由于水平有限,难免有不当或者错误之处,请大家指正,谢谢。

CAP原则

首先我们得了解CAP原则,又称CAP原理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得 。

  • 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
  • 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
  • 分区容错性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

在分布式存储系统中,最多只能实现上面的两点。而由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容错性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡,没有系统能同时保证这三点。

我的理解,使用mq实现最终补偿事务一致性,是牺牲了同意时刻访问同一份数据的一致性,通过一段时间的延迟,最终达到一致性。

Rocket mq实现思路

首先推荐事务和消息解耦的方式。

  • 开始事务,Prepared消息,RocketMQ会返回消息地址
  • 执行本地事务
  • 事务提交成功,通过拿到的消息地址去修改RocketMQ里面修改消息的状态,消息从Prepared变为发送成功;如果事务失败,则通过消息地址去修改消息状态,变为消息取消。
  • 消息消费方居于Push或者Pull方式消费消息成功后,向服务器发送消费成功的消息通知。

所以,在事务内需要向mq提交两次消息请求,一次是发送,另外一次是确认(确认成功或者取消)。

如果确认消息发送失败了,RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以消息生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

  • 优点: 实现了最终一致性,不需要依赖本地数据库事务。
  • 缺点: 实现难度大,主流MQ不支持,没有.NET客户端,RocketMQ事务消息部分代码也未开源
  • 极端情况下仍然需要手工介入

Rabbit mq实现思路

Rocket mq有所不一样。

消息生产方:

  • 首先执行本地事务,写消息表
  • 获取消息表的消息记录,向Rabbit mq发送消息
  • 接收Rabbit mq返回的消息确认接收成功通知(ACK),更新消息表;如果失败,则不更新,保证发送消息和记录表状态一致。

这个会存在这样一个问题:消息已经发送成功,但是Rabbit mq没有返回,则无法更新消息表;或者接收到消息成功发送通知,但是更新数据库失败,也无法更新消息表,导致下一次重复发送。解决这个问题的思路,我只想到需要消息接收方要做幂等性检查,从而避免重复消费消息。

消息接收方:

  • Rabbit mq通知有消息
  • 消息接收方接收消息,处理成功或者失败,都要返回确认ACK, Rabbit mq接收到ACK,根据ACK更新消息状态为已经发送,删除队列中的消息;或者更改消息列,等待从新发送。

这里有个问题,如果消息接收方成功处理消息,但是由于特殊情况没有返回ACK, Rabbit mq没有接收到ACK,

这条消息状态已经改变不会再发送,需要手工处理。

消息发送方代码示例:

配置:

1234567
rabbitmq:    host: localhost  #集群配置:addresses:ip1:port1,ip2:port2,ip3:port3    port: 5672    username: guest    password: guest    publisher-confirms: true  #确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack    publisher-returns: true   #确认消息是否正确到达queue,如果没有则触发,如果有则不触发

rabbitmq:
host: localhost #集群配置:addresses:ip1:port1,ip2:port2,ip3:port3
port: 5672
username: guest
password: guest
publisher-confirms: true #确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack
publisher-returns: true #确认消息是否正确到达queue,如果没有则触发,如果有则不触发

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
package com.sleb.springcloud.rabbitproducerack.service;   import com.sleb.springcloud.modalservice.Users;import com.sleb.springcloud.rabbitproducerack.config.CorrelationDataEx;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service; import java.util.Date; import static com.sleb.springcloud.modalservice.RabbitConfigInfo.EXCHANGE;import static com.sleb.springcloud.modalservice.RabbitConfigInfo.QUEUE_TWO_ROUTING; /** * 如果消息没有到exchange,则confirm回调,ack=false * * 如果消息到达exchange,则confirm回调,ack=true * * exchange到queue成功,则不回调return * * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) * 确认方式: * 方式一:channel.waitForConfirms( ) 普通发送方确认模式; * * 方式二:channel.waitForConfirmsOrDie( ) 批量确认模式; * * 方式三:channel.addConfirmListener()异步监听发送方确认模式; * * 采用第三种比较好,异步监听 */@Servicepublic class SenderService  {     @Autowired    private RabbitTemplate rabbitTemplate;     public void sender(Users users) throws Exception {         System.out.println("你好现在是 " + new Date() +"");        System.out.println("HelloSender发送内容 : " + users.toString());          /**         * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。         * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。         */        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {            //Users users1 = (Users)message.getBody().toString();            //String correlationId = message.getMessageProperties().getCorrelationId();             System.out.println("Message : " + new String(message.getBody()));            //System.out.println("Message : " + new String(message.getBody()));            System.out.println("replyCode : " + replyCode);            System.out.println("replyText : " + replyText);  //错误原因            System.out.println("exchange : " + exchange);            System.out.println("routingKey : " + routingKey);//queue名称         });         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {            if (ack) {                CorrelationDataEx c = (CorrelationDataEx)correlationData;                System.out.println("发送消息: " + c.getMsg());                System.out.println("HelloSender 消息发送成功 :" + correlationData.toString() );                /**                 * 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。                 */            } else {                System.out.println("HelloSender消息发送失败" + cause);            }        });         /**         * CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入         * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录         */        CorrelationDataEx c = new CorrelationDataEx();        c.setId(users.getId().toString());        c.setMsg(users.toString());         /**         * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes         * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数         */        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());         rabbitTemplate.convertAndSend(EXCHANGE, QUEUE_TWO_ROUTING, users, c);     } }

package com.sleb.springcloud.rabbitproducerack.service;

import com.sleb.springcloud.modalservice.Users;
import com.sleb.springcloud.rabbitproducerack.config.CorrelationDataEx;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

import static com.sleb.springcloud.modalservice.RabbitConfigInfo.EXCHANGE;
import static com.sleb.springcloud.modalservice.RabbitConfigInfo.QUEUE_TWO_ROUTING;

/**

  • 如果消息没有到exchange,则confirm回调,ack=false *
  • 如果消息到达exchange,则confirm回调,ack=true *
  • exchange到queue成功,则不回调return
  • exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
  • 确认方式:
  • 方式一:channel.waitForConfirms( ) 普通发送方确认模式; *
  • 方式二:channel.waitForConfirmsOrDie( ) 批量确认模式; *
  • 方式三:channel.addConfirmListener()异步监听发送方确认模式; *
  • 采用第三种比较好,异步监听

*/
@Service
public class SenderService {


@Autowired
private RabbitTemplate rabbitTemplate;

public void sender(Users users) throws Exception {

    System.out.println("你好现在是 " + new Date() +"");
    System.out.println("HelloSender发送内容 : " + users.toString());


    /**
     * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
     * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
     */
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {
        //Users users1 = (Users)message.getBody().toString();
        //String correlationId = message.getMessageProperties().getCorrelationId();

        System.out.println("Message : " + new String(message.getBody()));
        //System.out.println("Message : " + new String(message.getBody()));
        System.out.println("replyCode : " + replyCode);
        System.out.println("replyText : " + replyText);  //错误原因
        System.out.println("exchange : " + exchange);
        System.out.println("routingKey : " + routingKey);//queue名称

    });

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {
        if (ack) {
            CorrelationDataEx c = (CorrelationDataEx)correlationData;
            System.out.println("发送消息: " + c.getMsg());
            System.out.println("HelloSender 消息发送成功 :" + correlationData.toString() );
            /**
             * 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。
             */
        } else {
            System.out.println("HelloSender消息发送失败" + cause);
        }
    });

    /**
     * CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入
     * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
     */
    CorrelationDataEx c = new CorrelationDataEx();
    c.setId(users.getId().toString());
    c.setMsg(users.toString());

    /**
     * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
     * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
     */
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

    rabbitTemplate.convertAndSend(EXCHANGE, QUEUE_TWO_ROUTING, users, c);

}

}

消息接收方代码示例:

配置:

12345678910111213
rabbitmq:    host: localhost    port: 5672    username: guest    password: guest    template:      mandatory: true      #messageConverter: jackson2JsonMessageConverter 这个必须在程序里面创建bean    listener:      simple:        prefetch: 1        acknowledge-mode: manual        concurrency: 3

rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
template:
mandatory: true
#messageConverter: jackson2JsonMessageConverter 这个必须在程序里面创建bean
listener:
simple:
prefetch: 1
acknowledge-mode: manual
concurrency: 3

123456789101112131415161718192021222324252627282930313233343536373839404142
package com.sleb.springcloud.rabbitreceiverack.service; import com.rabbitmq.client.Channel;import com.sleb.springcloud.modalservice.Users;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Service; import java.io.IOException;import java.util.Date; import static com.sleb.springcloud.modalservice.RabbitConfigInfo.QUEUE_ONE_ROUTING; @Servicepublic class Receiver {    @RabbitHandler    @RabbitListener(queues = QUEUE_ONE_ROUTING) //containerFactory = "rabbitListenerContainerFactory", concurrency = "2")    public void process(Users users, Channel channel, Message message) throws IOException {        System.out.println("HelloReceiver收到  : " + users.toString() + "收到时间" + new Date());         try {            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了            // 否则消息服务器以为这条消息没处理掉 后续还会在发            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            System.out.println("receiver success");        } catch (IOException e) {            e.printStackTrace();            //丢弃这条消息,则不会重新发送了            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);            System.out.println("receiver fail");        }    }     @Bean    public MessageConverter jackson2JsonMessageConverter() {        return new Jackson2JsonMessageConverter();    }}

package com.sleb.springcloud.rabbitreceiverack.service;

import com.rabbitmq.client.Channel;
import com.sleb.springcloud.modalservice.Users;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Date;

import static com.sleb.springcloud.modalservice.RabbitConfigInfo.QUEUE_ONE_ROUTING;

@Service
public class Receiver {
@RabbitHandler
@RabbitListener(queues = QUEUE_ONE_ROUTING) //containerFactory = “rabbitListenerContainerFactory”, concurrency = “2”)
public void process(Users users, Channel channel, Message message) throws IOException {
System.out.println(“HelloReceiver收到 : “ + users.toString() + “收到时间” + new Date());


    try {
        //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
        // 否则消息服务器以为这条消息没处理掉 后续还会在发
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("receiver success");
    } catch (IOException e) {
        e.printStackTrace();
        //丢弃这条消息,则不会重新发送了
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        System.out.println("receiver fail");
    }
}

@Bean
public MessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

}

最需要考虑的两个问题:

  • 消息消费的顺序问题:发送消息指定队列,消息消费者指定队列可以解决,消费者只能一个。
  • 消息消费的重复问题:每次消费消息时候创建一消息表,在消费消息前先查询该表,如果消息存在就说明已经消费。

后记

对于Rocket mq实现的很多细节,大家可以参考官方网站的文档,这里写的比较粗。写博客很累,主要是没有经验,又担心有的地方理解错误,误导大家。出现问题,有的时候需要去从源码哪里找到答案。幸好我的博客只记录我的心得,没有任何其他的目的,聊以自慰吧。

 

本文作者:lazasha

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 使用mq实现分布式事务-补偿事务一致性


 上一篇
分布式与集群的区别是什么? 分布式与集群的区别是什么?
点击上方”python宝典”,关注获取python全套视频, 技术文章第一时间送达! 分布式在IDF05(Intel Developer Forum 2005)上,Intel首席执行官Craig Barrett就取消4GHz芯片计划一事,半
2021-04-05
下一篇 
自己动手实现分布式任务调度框架 自己动手实现分布式任务调度框架
点击上方“Java知音”,选择“置顶公众号” 技术文章第一时间送达! 作者:最后Q眼泪 cnblogs.com/rongdi/p/10548613.html cnblogs.com/rongdi/p&#x
2021-04-05