前言现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例SpringBoot集成RocketMQ导入依赖这里使用整合RocketMQ的基础依赖:rocketmq-spring-boot-starter<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifa
概述如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案分布式事务用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。
使用场景如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,批量消息可以把消息打包批量发送,批量发送消息能显著提高传递小消息的性能。批量消息概述批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB,如果超过可以有2种处理方案:1.将消息进行切割成多个小于4M的内容进行发送2.修改4M的限制改成更大可以设置Producer的maxMessageSize属性修改配置文件中的maxMessageSize属性对于消费者而言Consumer的MessageListenerConcurrently监听接口的
使用场景我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。概述延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费,在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1到18)来指定,分别有:messageDelayLevel=1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h可以通过修改配置来增加级别,比如在mq安装目录的broker.conf文件中增加如:m
前言在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以RoundRobin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?全局有序消息在RocketMQ中消息分为全局有序和局部有序消息,全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列
前言RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下普通消息的发送普通消息发送普通消息这里介绍三种发送方式,同步发送,异步发送,单向发送。我们先导入需要的依赖,版本尽量和RocketMQ的安装版本一致。<dependencies><dependency><groupId>org.apache.rocketmq</groupId>
前言本篇文章我们来一起来探讨一下RocketMQ的工作原理和一些核心概念,参考:RocketMQ开发官方文档:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.mdRocketMQ的架构RocketMQ的集群架构如下(图片来源于官网)RocketMQ架构上主要分为四部分,如上图所示Producer消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。Consumer消息消费的角色,支持分布式集群方式部
前言最近挺忙的,好久没更新文章了,最近在搞RocketMQ,那就先发点这个,Netty的文章等我空了再继续更。一.MQ概述1.MQ是什么MQ全称为MessageQueue,即消息队列,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程的软件系统,遵循FIFO原则。在高并发的分布式系统中使用居多。2.为什么用MQ为什么要使用MQ,我从生活中发现的一个例子来举例说明,即:天府通刷地铁出站的时候不是立马扣费,而是提示“出站成功,费用将会在稍后结算”,我们假设天府通后台扣费流程如下:【注意】假如天府通出站API和支付系统是不同的子系统(两个子系统远程通信)上下班高
1、官网RocketMQ为SpringBoot提供了整合方案,官网地址如下,上面提供了详细的整合步骤及案例。https://github.com/apache/rocketmq-spring官方详细文档(可以切换不同的版本)https://github.com/apache/rocketmq-spring/blob/release-2.0.1/README.md2、消息生产者1)添加依赖<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starte
1、基本步骤1.1、导入MQ客户端依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>1.2、消息发送者步骤分析1.创建消息生产者producer,并制定生产者组名2.指定Nameserver地址3.启动producer4.创建消息对象,指定主题Topic、Tag和消息体5.发送消息6.关闭生产者
1、消息存储分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。消息生成者发送消息MQ收到消息,将消息进行持久化,在存储中新增一条记录返回ACK给生产者MQpush消息给对应的消费者,然后等待消费者返回ACK如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤MQ删除消息1.1、存储介质关系型数据库DBApache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配
1、什么是死信队列当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-LetterQueue,DLQ),而其中的消息则称为死信消息(Dead-LetterMessage,DLM)。死信队列是用于处理无法被正常消费的消息的。2、死信队列的特征死信队列具有如下特征:死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的死信存储有效期与正常消息相同,均为3天(commitlog文件的过期时间),3天后
1、顺序消息的消费重试对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的情况。DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("cg");//顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10,30000]consumer.setSuspendCurrentQueueTimeMillis(100);由于对顺序消息的重试是无休止的,不间断的,直至消费成功,
1、说明Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。对于消息重投,需要注意以下几点:生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的只有普通消息具有发送重试机制,顺序消息是没有的消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在RocketMQ中是无法避免的问题消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件producer主动重发、consumer负载变化(发生Rebalance,不会导致消息重复,但可能出现重复消费
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。1、Tag过滤通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("CID_EXAMPLE");consumer.subscribe("TOPIC&
1、批量发送消息1.1、发送限制生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:批量发送的消息必须具有相同的Topic批量发送的消息必须具有相同的刷盘策略批量发送的消息不能是延时消息与事务消息1.2、批量发送大小默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送方案二:在Producer端与Broker端修改属性Producer端需要在发送之前设置Producer的maxMessageSize属性Broker端需要修改其加载的配置文
1、问题引入这里的一个需求场景是:工行用户A向建行用户B转账1万元。我们可以使用同步消息来处理该需求场景:工行系统发送一个给B增款1万元的同步消息M给Broker消息被Broker成功接收后,向工行系统发送成功ACK工行系统收到成功ACK后从用户A中扣款1万元建行系统从Broker中获取到消息M建行系统消费消息M,即向用户B中增加1万元这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。2、解决思路解决思路是,让第1、2、3步具有原子性,要么全部成功,要
1、什么是延时消息当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台业务系统(
1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。2、为什么需要顺序消息例如,现在有TOPICORDER_STATUS(订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。根据以上订单状态,生产者从
1、消息发送分类Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。1.1、同步发送消息同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。1.2、异步发送消息异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。1.3、单向发送消息单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。2、代码举例
1、概念消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息。消息出现堆积进而会造成消息的消费延迟。以下场景需要重点关注消息堆积和消费延迟问题:业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受。2、产生原因分析Consumer使用长轮询Pull模式消费消息时,分为以下两个阶段:2.1、消息拉取Consumer通过长轮询Pull模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。对于拉取式消费,
1、什么是消费幂等当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。幂等:若某操作执行多次与执行一次对系统产生的影响是相同的,则称该操作是幂等的。在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。2、消息重复的场景分析什么情况下可能会出现消息被重复消费呢?最常见的有以下三种情况:2.1、发送时消息重复当一条消息已被成功发送到Broker并完成持久化,此时出现了网络闪断,从而导致Broker对Prod
消息被消费过后会被清理掉吗?不会的。消息是被顺序存储在commitlog文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以commitlog文件为单位进行清理的。否则会急剧下降清理效率,并实现逻辑复杂。commitlog文件存在一个过期时间,默认为72小时,即三天。除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:文件过期,且到达清理时间点(默认为凌晨4点)后,自动清理过期文件文件过期,且磁盘空间占用率已达过期清理警戒线(默认75%)后,无论是否达到清理时间点,都会自动清理过期文件磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的
这里的offset指的是Consumer的消费进度offset。消费进度offset是用来记录每个Queue的不同消费组的消费进度的。根据消费进度记录器的不同,可以分为两种模式:本地模式和远程模式。1、offset本地管理模式当消费模式为广播消费时,offset使用本地模式存储。因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。Consumer在广播消费模式下offset相关数据以json的形式持久化到Consumer本地磁盘文件中,默认文件路径为当前用户主目录下的.rocketmq_offsets/${clientId}/${group}/O
订阅关系的一致性指的是,同一个消费者组(GroupID相同)下所有Consumer实例所订阅的Topic与Tag及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。1、正确订阅关系多个消费者组订阅了多个Topic,并且每个消费者组里的多个消费者实例的订阅关系保持了一致。2、错误订阅关系一个消费者组订阅了多个Topic,但是该消费者组里的多个Consumer实例的订阅关系并没有保持一致。2.1、订阅了不同Topic该例中的错误在于,同一个消费者组中的两个Consumer实例订阅了不同的Topic。Consumer实例1-1:(订阅了topic为jodie_test_A
消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。消费者组对于消息消费的模式又分为两种:集群消费Clustering和广播消费Broadcasting。1、获取消费类型1.1、拉取式消费Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差1.2、推送式消费该模式下Broker收到数据后会主动推送给Cons
indexFile除了通过通常的指定Topic进行消息消费外,RocketMQ还提供了根据key进行消息查询的功能。该查询是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索引数据是在包含了key的消息被发送到Broker时写入的。如果消息中没有包含key,则不会写入。1、索引条目结构每个Broker中会包含一组indexFile,每个indexFile都是以一个时间戳命名的(这个indexFile被创建时的时间戳)。每个indexFile文件由三部分构成:indexHeader,slots槽位,indexes索引数据。每个i
RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中。abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭。checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的config:存放着Broker运行期间的一些配置数据consumequeue:其中存放着consumequeu
1、消息的生产过程Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程:Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求NameServer返回该Topic的路由表及Broker列表Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue路由表:实际是一个Map,key为Topic名称,value是一个
1、集群架构这里要搭建一个双主双从异步复制的Broker集群。为了方便,这里使用了两台主机来完成集群的搭建。这两台主机的功能与broker角色分配如下表。序号主机名IP功能broker角色1rocketmqOS1192.168.216.128NameServer+BrokerMaster1+Slave22rocketmqOS2192.168.216.129NameServer+BrokerMaster2+Slave12、克隆生成rocketmqOS1克隆rocketmqOS主机,并修改配置。指定主机名为rocketmqOS1。3、修改rocketmqOS1配置文件3.1、配置文件位置要修改的配