rabbit mq 是什么东西? rabbit mq 也就是消息队列中间键。消息中间键是在消息的传递过程中保存信信息的容器。消息中间键再将消息从他的圆中到他的目标中标识充当中间人的作用。 队列的主要目的是提供漏油,并保证消息的传递。如果发送消息时,接收者不可用消息,队列不会保留消息,直到可以成功的传递为止。当然,消息队列保存消息也是有期限的。
粉丝1202获赞2514

对于工作了几年的加瓦程序员,消息中间键是面试经常被问到的问题,一般面试官会根据你的简历和你了解的 mq 产品来问问题,所以面试之前你也不用把每个消息对列产品都准备一遍。 目前主流的消息对列产品有 ruby democo、 rocky democo、 卡不卡和 pasa。 比如下面的这个 rapidmoco 的问题该如何回答呢? ruby democo 的消息是如何进行路由的?好,那首先我们看一下 rapidoco 它的整体结构,有一个发布者,有一个消费者,中间是我们 ruby democo, 那么它里边有一个交换机, 然后有一个对列,那么交换机和对列之间啊,有一个绑定关系。好,那么有了这个整体结构以后,那专门扣的消息是如何进行录? 首先第一种情况啊,是最简单的一种情况,我们有一个发布者,有一个消费者,然后我们发布者把消息发布到对面上,那么这个时候没有指定交换机,它采用的是默认的交换机。 那么第二种情况就是我们有一个交换机,有两个对列,那么这两个对列呢,都和我们这个交换机进行了绑定,所以这个时候我们发布者发送一个消息的时候,这个消息会录入到这两个对列上, 也就是只要这个对列和这个交换机绑定了,那我们发送者发送消息的时候,交换机就会把这个消息分别投递到这两个对联上。好,那下面我们看一下。第三种情况, 我们有一个交换机,有两个对列,那么这两个对列和这个交换机之间的绑定关系是通过一个路由键进行绑定的,比如说上面 这个对列和我们交换机的绑定是通过一个 iro 这个路由键进行绑定,那下边这个对列和交换机绑定,它是用了 infoiro 和 woni, 哎,用这三个键进行绑定的,那我们现在比如说我发一个消息,如果我是用 iro 这个路由键来发送消息, 那这个时候他就会投递到上面这个对列和下边这个对列。因为上面这个对列是通过 lr 和交换机绑定,下面这个对列也有一个 l 这个路由键进行绑定,下边对列也和交换机用 lr 进行绑定了。所以此时我们通过 lr 这个路由键发送消息的时候, 它既可以投递到上面的对列,也可以投递到下边的对列,那现在如果我用个 infor 来作为路由键发送消息,那这个时候这个消息只可以投递到下边的这个对列上。好,那今 接下来我们看一下第四种情况,就是我们有一个交换机有两个对列,那么这个对列和它绑定的时候,它采用的录入键,哎是新号,点 orange 点新号, 那么这个呢是一个正则匹配信号,代表一个单词,所以我们这个路由键就是中间是 orange, 前后分别有一个单词,如果你满足这个路由键,那么这个时候就头对到我们上面这个对列, 比如说我们下边的这样一个路由 k, 它就满足这个路由键前面一个单词,后面一个单词中间是 orange, 那么你通过这个路由键发送消息的时候,就投递到我们上边这个对列。 好,那我们看一下这样一个路由键,他刚好满足我们这个路由键,所以你通过这个路由键投递消息的时候,他就会投递到我们下边这个对列。好,那我们看一下下边的这个路由键,我们的井号是代表多 多个单词,所以我们这个 lazy 后边有多个单词,那么这样的话呢,他就满足这个路由键,所以我们通过这个路由键发送消息的时候,他就会投入到下边的这个呢对列里边。好,那么这是我们第四种情况, 那么第五种情况就是我们这个图,我们有一个交换机,有两个对列,那么交换机和对列之间的关系呢?它是通过一个头部属性来进行绑定的,那么只要你这个头部属性满足这两个键值对的要求, 那就投递到我们上面这个队列,满足下边这两个键字队要求,就投递到下边这个队列。 那比如说我现在我在消息的这个头部,我设置一个 type 等于 s state 是等于零,那这个时候他就投递到我们下边的这个对列,当然他的这个头部匹配,他有 完全匹配,就是所有的键和值都匹配,还有任何一个键值匹配,那现在我们这个情况是所有的都匹配,还有种情况就是我们只需要有一个键值对匹配,他也可以投递,也就是说 type 和时代的只要有一个匹配,我们就可以把消息投运到这个对接上。 哎,那么他也有这种情况,有完全匹配和任何一个匹配。那么以上呢,就是我们 rumble co 的消息啊,是如何进行路由的?

好,同学们,接下来我们就开始今天的第二章, rabbit mq 的快速入门, 在这一章当中啊,我们会去了解一下 rabyten q, 并且呢去安装它,然后我们会给大家介绍一下 rabytem q 当中的常见消息模型,最后呢我们还会带着大家动手呢,去写代码,看一看 rabyten q 它如何实现这种意步的消息通讯。 好,那接下来我们就进入第一部分啊, rabbit mq 的一个简单认识和安装。 rabbit mq 啊,是一个基于啊浪语言开发的开源消息通讯中年间, 而阿浪语言大家可能不太熟,这是一个面向并发的变成语言,天生就是为了分布式系统来设计的,而 rabbit mq 基于他来 完成,自然就具备了这些个特征,那因此他的一个性能,吞吐量都还是相对来讲不错的啊。但 rabomq 最擅长的其实是消息的可靠性,稳定性,整个系统的这种高可用啊。 那这里呢是 mq 的一个官方网站,我已经提前打开了啊,大家可以看到啊,这个网站啊,这上面有一群的兔子,那另外呢,这边呢还会有一些文档,那大家呢,如果想要去学习 mq 最好呢基于这个文档去进行学习啊, 好,那现在我们再回到 ppt 啊啊,那在这里呢,我们就带大家去安装一下 rabyten q 了啊,安装方式呢,可以参考课前资料, rabyten q 的部署指南, 我们打开看见资料可以看到呢,这里有一个文档,然后咱们 q 不输指南,我们把它打开。 好,那接下来呢,我们就来完成这个部署,这个部署呢有单击部署和集训部署两种方式,我们呢就来看单击部署就行了,我们将在三斗司训练机当中利用刀砍来进行部署,因为这种方式是最简单最方便的嘛。 啊,那第一步肯定是要获取刀壳的镜像了,那么这个镜像大家可以去直接 po 拉取,也可以从科研资料当中去导入这个踏包,而导入的方式我们之前已经讲过了啊,就是利用这个刀壳 low 的命令来完成导入,对吧? 那么下边呢,我们就来打开我们的训练机,我们通过 docker amidis 先查看一下我们本地印象。啊, 好,可以看到,我这里也没有 mq, 对不对?那接下来呢,我就按照刚才文档中所说把它导入一下,那我们打开, 哎,课前资料这里呢有一个套包,现在呢,我们把它上传上来啊,那上传到哪里呢?我们上传到这个 tmp 的目录吧,这临时目录嘛,我们把它拖上来。 好,我们可以看到一百多兆啊, 好,上传完成,完成以后,我们再次回到这个小控台,然后通过一个命令来安装啊,我们先进入天平部路看一眼,有的吧,怎么导入?同学们,命令是什么?告诉我 刀干什么 low 的,干啊,哎, mq 点踏,哎,很好啊,走 好。经过一番安装,我们发现导入成功了,我们通过 docker images 查看一眼,发, 发现这里导入了一个名为 rap mq 版本呢,是三杠 management 的这样一个镜像,已经成功导入了。好,那第一步我们就完成了,我们再次回到文档看一下, 导入了以后,下一步自然就是安装 mq 了,运行这个刀口 rad 命令吗?那这个命令呢?在这我已经给大家贴好了, dock run 杠一 杠一呢是给我们的 mq 设置一个黄金变量,黄金变量我这配了两个,一个是 user, 一个是 password, 也就是用户名和密码。将来我们需要去访问 mq 或者是登录他的管理平台,都需要用这个账号和密码, 再往下干个 name 呢,是给他起个名字,这块呢是配置主机名,不配呢,也没问题,但是如果将来做集群部署就要配这个东西了。杠批呢是他的端口映射,这里我们开放了两个端口啊,一 个是幺五六七二,这个呢是 raby 咱们 q 的管理平台的端口,他会给我们提供一个 ua 界面,我们去管理起来就非常方便了。第二个五六七二呢,是将来做消息通信的一个端口,也就是说我们将来发消息,收消息都要通过这个端口去建立连接。 杠地是后台运行,最后呢是镜像的名称。好,那下边呢,我们就把这个命令啊给他敲一下,我们回到下个控制台,然后在这呢进一个粘贴回车 好容器创建完成,我们通过刀砍 ps 查看一下是不是成功的启动起来了。启动起来以后呢,我们就可以去访问了啊,他端口我们刚才讲了,幺五六七二是他的什么端口啊?是不是空了台端口,所以我们只需要在浏览器里访问这个地址就能够看到了 啊,那现在呢,我们就打开浏览器,在这里输入训练基地地址啊,幺九二点幺六八点幺五零点幺零幺,端口呢是幺五六七二,大家记住这个端口啊, 好,回车可以看到呢,成功的进入了 raby temque 的管理界面了吧,这里啊要输入一个账号和密码,我们刚刚在刀口 rat 命令中啊,指定的是 it cast, 密码是一二三三二一,点击 log in, 这样呢就成功的登入了我们 rabbit mq 的一个管理平台了。同学们, 那么在这个界面当中有很多的东西啊,大家现在都不认识啊,没有关系,等一会呢,我们就会一一给大家介绍。那首先呢,这第一个界面叫 overview, 就是总懒,那么这个界面呢,主要就是 mk 的一些节点的一些详细信息,当前我们是单节点运行,没有集群吧,所以节点中只有一个,然后呢,后边是他的这个硬件的一个信息啊, 然后在这儿 connection, connection 就是连接,将来无论是消息的发布者还是消息的消费者,都应该跟我们的 mq 键连接吧,所以这个 connection 呢,就是连接, 而前头呢是一个通道,将来我们建立连接了以后,一定要创建一个前头通道, 然后生产者或者是消费者才能基于钱脑完成消息的发送或者是接收啊,所以你可以认为钱脑是 mq 当中做消息发送也好,接收也好等等各种操作的一个具体对象了啊,将来每一个连上来的人都应该去创建一个或多个通道啊,再往下呢, x 欠着,这个叫交换机,他是一个消息的路由器啊,就跟咱们那个教室里边的路由器交换机效果是类似的。然后 q 呢,就是对列了, 对列就是来做这个消息存储的啊,那现在呢,可以看到还没有任何的对列吧?刚才交换机看到还是有一些交换机的啊,但是没有对列 好,那最后奥特曼呢?是叫做管理,那么在这个界面当中,我们可以去管理当前这个呃用户信息啊,比方说我现在是不是有个用户叫 it 卡死他呀?那我还可以干什么呢?在这创建更多的用户, 比方说我现在要创建个用户,我只需要在这点击爱的 uzer, 然后在这呢给用户起个名字啊,比如说就叫李四,然后给他起个密码,比如说一二三,一二三,那这样一个新的用户信息就填好了,这里还可以给用户分配一个这个 呃角色啊,比如说艾特曼呢,是超级管理员啊,这里的 money 特呢,就是一个系统监控的啊等等,我们可以去选,比如说选艾特曼啊,超级管理员,那么这个信息呢,就成功的提入到这里了,最后点击艾的 user, 那么我们这个用户就添加成功了, 但是你发现啊,当我新增一个用户以后,他其实是没有任何访问权的,看到没有?没有 no access, 那这个是因为什么呢?这里有一个说明啊,大家可以看到叫做 virtue house, 虚拟主题 啊,虚拟主机啊,虚拟主机是什么呢?是我们 mq 当中的一种逻辑划分,将来我们会有很多个用户,对吧?那这个埃迪卡斯的用户,他来操作我们的 rabbit mq, 创建自己的队列,自己的交换 机等等,那么我们的李四用户也来做这些事,那这两个人在操作过程中是不是有可能产生冲突啊, 那为了避免这种问题的发生啊,我们就有一个叫 virtue house 虚拟主机的概念,那通过虚拟主机对不同的用户进行一个隔离,大家互相看不到对方的东西,这种呢就称之为多租户的一种处理了,对吧?是一种隔离,这就是虚拟主机的概念了, 虚拟主题目前啊默认只有一个,就是斜杠,那我们还可以通过在这玩手 houses 创建更多的虚拟主题,比如说添加一个新的名字呢,也可以随便取,比如说就叫杠, it cost, 然后在下边呢,还有这个虚拟主机的一些描述信息啊,你也可以写,也可以不写好,点击添加,你看一个新的虚拟主机是不是创建好了,现在我们再次回到 u 字界面,在这个界面当中呢, 我们可以看到艾迪卡斯用户具备杠和艾迪卡斯访问权限,而李四还是没有任何权限,而你点击用户信息了,就会进入用户的视力界面,你可以给他分配权限啊, 怎么分配呢?比如说我让他访问爱迪卡斯特,那这样呢,点击赛的 permission, 他就具备了对爱迪卡斯的这个虚拟主题的访问权了。那我们再次返回到优乐界面,可以看到是不是有访问权了, 然后呢,我们还可以点击到交换机的界面来看一眼,在这个界面当中,你会发现啊,虚拟主机杠和 airicast 他们所具备的信息是不是一样的, 也就是说呢,这里面虽然名称是冲突的,但是呢,因为虚拟主机不同,他们是不是给被隔离开了?一般情况下,同学们,我们回答的跟你见面每一个用户应该由自己独享的虚拟 主题啊,同学们能理解的意思吧,好比方说我的艾迪卡斯的,我点进去我就让他只具备杠的防御拳,不具备这个艾迪卡斯的防御拳,那这样一来,他们两个的业务是不是就被隔离开了,互相就看不到对方的这个内容了? 那这就是我们的这个多租户以及这个虚拟主机的一个隔离了。行了,那 mq 的安装就完成了,下面呢,我们回到 ppt 来看一下我们这个 mq 的一个整体结构啊,刚才其实我们已经给大家做了简单介绍了,这边 population 呢,是我们的消息发送者,看修门呢,自然就是我们消息的消费者, 他们的意思呢,将来会把消息发送到 x change, 也就是我们的交换机,交换机呢,负责路由,再把消息投递到 q 对列,对列负责斩存小。 而后呢,我们的消费者再去从队列当中获取消息,然后处理消息,整体呢,就是这样子的啊,那你会发现呢,这里有一个叫 vershow house 的一个概念虚拟主机吗?将来我创建了一个用户以后, 他呢会有自己的一个虚拟主机,再创业,新的用户再有虚拟主机,那各个虚拟主机之间是相互隔离的,看不到的,这样可以避免干扰,这就是整体 mq 的一个架构了啊, 好,那最后呢,我们来做一个总结, rabbitom q 当中的几个概念,第一呢是 channel, 是操作 mq 的工具啊,你做消息的发送也好,接收也好,都必须要用的 channel。 x change 是消息陆游,将来消息发送给他,他在陆游给对列。对列呢?当然就是缓存消。 记得了 versa house 的虚拟主题是对 q x 限制等等这些东西资源。那个逻辑分组啊,将来呢?给它隔离起来,不同用户可以访问不同的虚拟主题。好,那我们这节课的内容就到这里。


最近段时间啊,很多粉丝反馈,他说投了很多的简历出去,没有什么面试机会,而且啊,即便是有面试机会,面试也基本上通,不过,确实啊,今年的面试难度呢,特别高,所以啊,大家可以趁这段时间好好去积累一下,为经久营食做好准备。哈喽,大家好,我是麦克, 一个没有才华只能靠颜值混饭吃的加号程序员。今天分享的问题是,如何去保证 readymq 的消息的可靠性传输?下面我们来看看普通人和高手的回答,普通人的回答,保证 rabymq 的消息可靠性呢?我认为,呃, 就是在消息发送端去发送到这个消息到那个舍我端的时候,我们可以通过那个同步的一个调用,就是说他可以得到一个反馈结果,这样,这样的话,我去确保我的消息保存到 robinq 的舍我端,然后舍我端,因为他本身有个持久化机制,所以他会持久化下来,这样的话,我只要确保消息发送到舍我端的话,他的消息应该是不会丢失的啊,这样,最后,我只需要去消费消息,那么消费消息的话,呃,消费消息,因为 那个奢望他会去确保消息的一个可靠性。徒弟吗?所以,所以他的消息应该是不会丢失,所以这样的方式应该是可以解决这个问题。嗯, 高手的回答,好的,面试官,这个问题呢,我需要从几个方面来说明,首先啊,在 rap 里面 q 的整个消息投机过程中呢,有三种情况下会存在消息丢失的一个问题。 第一个是生产者把消息发送到 rabymq 的 solo 端的过程中会存在消息流失。第二个是 rapponq 的 solo 端收到消息之后,持久化之前档机导致消息流失。第三 这是消费端收到消息还没有来得及处理之前,当机导致 rappink 收货端会认为这个消息已经被签收,所以这个消息无法被重复投递,导致消息无法消费这样一个问题。所以我认为只需要从这三个维度去保证消息的口号去传递就可以了。从生产者发送消息到收货端的这个角度来说, robin mq 呢,提供了一个 confirm 的消息确认机制,也就是生产者发送消息到收尾端以后呢?如果消息处理成功,收尾端会返回一个 ack 的消息,那么客户端可以根据消息的处理结果来决定是否要对消息进行重新发送,从而去确保消息一定要到达到 rap mq 的收尾端上。 从 rabbitmqso 端的这个角度来说啊,可以开启消息的持久化机制,也就是收到消息之后啊,持久化的词盘里面。那么设置消息的持久化方式有两个步骤,第一个是创建 的时候设置为持久化,第二个是发送消息的时候把消息的投地模式设置为持久化投地。不过虽然设置了持久化消息,但是有可能也会出现问题,比如消息刷新到词盘之前, rabimque 的 ceo 端当机 导致消息丢失的一个问题。所以为了确保万无一失呢,我们需要结合康分消息确认机制来一起使用。从消费者的角度来看啊,我们可以把消息的自动确认机制修改成手动确认,也是说消费端只有手动调用消息确认方法才表示这个消息已经被签收, 那么这种方式呢?可能会造成消息的重复消费的一个问题,所以这里需要去考虑到逆等性的一个设计。以上就是我对这个问题的理解,保证消息的可靠性问题啊,不管是卡不卡 rocketem q 还是 rapton q, 解决方法都是差不多的,而且这些消息中间一定会提供对应的解决办法。这个问 问题啊,只是考察求职者对于 mq 使用上的一些理解,考察不深,但是也是一个比较常见的面试题。好的,本期的普通人 vs 高手的面试系列视频呢,就要知道结束了,大家点赞收藏加关注,我是麦克,我们下期再见!

rep q 如何保证消息不丢失?我们首先来看一下 rep q 它的一个发送流程图啊,作为一个生产者,首先要把这个消息发送给 rep q 里面的一个叫做 exchange 的东东,所以这是它的第一步。然后第二步的话呢,这个消息它会被分发到对应的对应的里面,这是第二步。 然后呢,第三步的话呢,这个消息会被投递到对应的消费者,所以这是 rapling 他的一个工作流程。那么如何确保消息不丢失呢?其实要站在四个点,首先第一个点,我们要确保第一个消息要到 mq, 也就是说你进行发送的时候,至少这条消息要到 mq 里面来。 好,这是第一点。第二点的话呢,我们要确保消息能够路由到正确的队列,因为这个交换器啊,他就是类似于路由器的功能,他负责消息的路由。所以呢,在第二步里面,你要确保这个消息能够路由到正确的队列啊,如果, 如果说没有队列可以入游,这条消息有可能丢失。好,这是第二个。第三个的话呢,就是这个消息要在队列中间进行正确的存储,因为你作为 rapinq, 他有可能发生荡击或者发生重启, 所以呢,一般情况下面我们要把这个消息啊做一定的持久化,所以这是第三步啊。第四步的话呢,就是我们要把确保这个消息啊能够正确的投递到消费者。那么作为 rapinq, 他一般来说默认情况下面就是消息你发送给消费者 consome, 那么我就会把对应的这个消息从消费者里面进行删除, 但是有一种可能性就是消费者在消费消息的时候,他可能抛出了异常,那他一旦抛出异常的时候对不对?这个消息是没有被成功的消费的,但是呢,这个消息却从 reptile 里面进行了删除啊,所以呢,一般来说我们要去控制下异常,以及呢可能要进行一些重发机制。好, 现在我们来进行依次的讲解。首先第一个的话呢,如果你要确保消息到 m q, 你要使用一种发送的模式,叫做发送方确认的模式,那么发送方确认的模式怎么玩呢?我们来看代码, 好,这个呢就是我写的一段 rubbing q 的一个发送的代码啊,你可以看到我写了一个 rust ctrl 呢, 一个专门英语可以在浏览器里面直接访问到的一个 ctrl 呢,一个请求类啊。然后这个时候呢,我们对应一个路径叫做 derek, 如果说你发送 direct 的消息,他会响应在这里,然后我使用 spend boot 里面叫做 rep and q tablet 点 covent and send 这个方法向 rep 进行发送消息啊,发送到一个 direct exchange 交换器,然后对应有对应的建值,有对应的发送消息的本体。这个时候为了确保我相信能够到 q, 应该怎么做呢?那么我们来看一下 replica 的配置。 replica 的配置情况下面大家可以看到,首先的话呢, replica 它会使用连接,在连接的时候我们要设置一个这样的开关啊,这个连接里面叫做 connection factory, 要设置 publish come from 这个开关,把它设置为 two, 也就是意思我们的消息发送确认开关要打开好,这是第一个点,然后第二个点的话呢,在我们发送的这个 template 里面,我们要设置一个消息发送的回调函数,也就是我们要设置一下 come from call back 的回调函数,那么它具体的回调函数是怎么写的呢?来看一下啊。这个地方,当然我是用的一个艾特病,你也可以单独写个累啊。这个呢是 rubbing cutember 里面提供的一个 come from callback 的回调函数,在这个回调函数里面呢,它有一个 come from 的方法。在这个 come from 的方法里面呢,我们拿到对应的 a、 c、 k, 也就是说如果消息已经成功的发送给 mq 了,这个 sk 肯定会等于处,那么这个时候我们可以不处理,或者是稍微打印一下,其实这个消息呢,已经发送给 mq 成功了。 另外的话,如果这个 ack 他不等于处,那么我们就需要去考虑重发。所以呢,这个地方可以进行对应的异常处理,或者是专门写断代码,专门处理来那些发送失败的消息,所以这是我们刚才讲的怎么确保消息到 mq。 好的,我们再来看一下怎么确保我们的消息能够成功的路由到对应的对列,因为你要知道,就算你消息都来 mq, 他只是说消息首先到了这个交换器,但是到了这个交换器之后, 这个消息要路由段对应的对列,如果说你发生了路由失败,一般情况下面啊我们的消息会丢,所以 这个地方我们要开启一个通知,叫做路由失败通知。那么具体怎么处理呢?我们再来看待吗? 好在这个代码中当中我们发到啊,我们点到这个 raptin q 的 template, 这个呢就是消息发送的一个 tempt 类啊,在这个类里面,前面我们已经讲了,这个类里面呢,如果说我们要去设置一个发送失败通知,首先我们要把发送失败通知的开关打开,叫做 set, 叫做 metadare 啊,叫做 set metatory 啊,这个东西我要把它设置为 true, 这个东西设置为 true 呢?是什么意思呢?这个东西叫做开启 失败通知,好,这是第一个,那么设置为 two 之后,紧接着你还要做个事情,因为所有的 m q 里面这种消息的确认啊,他都是走的回调,所以我们还需要去设置 一个回调的函数,叫做 set return call back, 也就是说如果一旦发生了消息的回调失败,这个时候呢,我们可以给他给个方法叫做 return call back 的方法。好,那这个地方就意味着 叫做消息陆游失败,这个时候我们要通知,通知到哪里呢?通知到这个 return call bike 的这一个并里面来, 也就是我定义的一个病的类啊,在这个类里面呢,注意了,有一个方法叫做 return message, 方法,就是一旦发送发生了这种消息路由失败,他就会通知这个方法,在这个方法里面呢,有 message, 有失败的代码对不对?有我们的对应的交换器,有路由键等等之类的啊,所以这里面我我们可以进行一个打印,比如说你如果方法站在这里,那说明他是无法录 路由的消息,这个时候我要考虑重新处理了啊,如果说不考虑处理的话,这个消息他肯定已经丢了,所以呢,这就是如果发生了路由失败,你应该怎么处理的?代码, 好的,我们再回来啊,其实我们还要处理第三个点,就是确保消息能够在消息堆里面进行正确的存储啊,一般来说你要确保消息万无一失, 作为交换器 exchange, 作为对列, ok, 还有了作为对列里面的这个消息,这三者他都需要进行持久化,也就是这三者他都需要保存到磁盘当中,才能确保你 mq, 如果万一发生了 重启发生的荡机,对不对?这个时候我们的消息还可以在 m q 里面进行正确的存储,所以呢,这是第三步,好最后 一步。第四步要确保消息从对列当中进行正确的投递。这个时候一般来说,我们就不让我们的 rap q 走自动提交了,要走一个手动确认,也就是说这个确认呢,要交给消费者来确认,那么具体怎么做呢?我们再来看代码, 首先在这段代码当中,在我的 rapping two 里面的对应的配置里面啊,我定义了一个并,这个并是什么呢?它是一个专门用来手动消费的一个消费者。所以呢,首先 啊,我们这个里面,从这个叫做 simple message listened container 里面,我们拿到一个对应的容器,通过连接工厂拿到对应的容器,一个监听容器, 然后这个监听容器里面呢,我们专门去绑定一个队列,比如说像 q 一好,绑定这个队列之后,我们要设置这个容器,它不是自动提交,所以呢,我们 set acknowledge mode 设置成 menu, 就是一个手动的方式, ok, 然后的话呢,手动方式是不是要交给我们的消费者确认?那给谁确认呢?于是在我们的这个容器里面,我们要设置一个 set message listener, 设置我们具体消费的方法,比如说我这里面定义了一个 resire 的并啊 啊 receiver 的病,在这个 receiver 的病里面,大家可以看到它是实现了一个叫做 china available message listener 的接口,它就是一个专门进行手动消费的一个监听,然后里面有一个方法叫做 on message 方法,就是一旦你监听的这个对着里面有消息,他就会去触发这个 on message 方法进行消费。 在这个 on message 方法里面,大家注意了啊,首先的话呢,我们会拿到对应的 message 消息,还有呢,我们去进行消息通讯的,它对应的信道 china 好,如果这个地方你发生了业务的异常,比如说我们抛出了 exception 怎么办? 那这个时候我们就需要去对这个对应的 channel, 把它设置一个 n a c k, 通过这一个 channel 点 based n a c k 方法设置什么设置我们这个消息了没有消费成功 啊?因为进行消费成功的话是 a c k, 所以说如果正常进行的话呢,我们这个地方甚至 china based a c k 啊,甚至这个消息了已经消费成功了,所以呢,这就是通过手动的方式啊,在你的消费者业务代码里面发生异常的情况,下面我们可以通过手动的方式让 m q 里面进行一个重新派送 啊,当然呢,除了重新派送之外,如果你重新派送失败多次怎么办呢?这个时候一般来说啊, 在 rapinq 里面他也有一个死性消息,就是他会进入一个死性对列里面来进行对列的处理。好,这就是我们刚才所讲的,作为 rapinq 怎么去确保消息做到万无一失不丢失。


今天给大家分享一下这个让 mq 批量消费的一个代码演示啊。首先说一下为什么要去批量消费呢?其实批量消费的话,可以,呃,提高我们的一个消费速度, 然后我们来看一下如何去使用啊,看一下消费者这边我们的一个写法啊。首先说这个注解和普通的一个消费会有所不同,这里我们是指定了他的一个啊,腾腾的 factory 啊,这是我们自己配置的,在这边。 嗯,我们这一配置的这个 factory, 它其实是具备一个批量的一个功能啊,在这边我们要去开启它的一个批量功能,然后并且设置它的相关批量参数 啊,大概的意思就是说我们这边会设置一批是十个消息啊,然后第二个参数是等待的最长的一个时间啊,比如说我这个对列当中只有九个消息,那他不会一直等待,比如说这里设置十秒,那十秒钟之后的话,就算是九个消息,他也会去当成一批来消费。呃,这个参数的一个设置呢,还是要依据于我们的一个业务来进行 一个调整好,然后我们继续看消费者这里,呃,除了这个肯定的 factory 的不同之外,这个进行最终的一个消息消费的时候,这里的一个入餐也变成了一个类似的的一种形式啊,因为是一批消息嘛,然后普通的话可能只是传入一个对象就可以,这个是我们的一个业务对象啊,自己定义的。 呃,然后这里我会去做了相关的一个打印啊,重点是这个消息个数的一个打印,一会测试会用到。嗯,然后这边生产者这里就是普通的借助于这个 rabbit 的一个 temper 里头去进行的一个发送好,然后我们这边看一下测试, 这个单测试这边其实我们啊想的是用呃十五个消息去进行测试啊,首先是啊,十个消息的时候,我们去进行一个日志打印,看他的一个消费情况啊,每个消息发送的时候会去啊,间隔一秒钟,然后我们这边来执行一下, 我们这边测试用力跑完了,我们来看一下他的一个输出啊。首先说我们看这个第一行输出啊,是在五十三秒的时候进行了一个输出发送十个就是这里的输出。那 五十三秒输出完了,发送十个之后呢,这里紧接着就开始了一个消费,然后消费的消费个数是十个啊,这里说明其实他是等待着呃队列当中有十个消息之后,他才会去进行一个消费,对吧? 嗯,那后面的第三行输出是在第五十八秒的时候输出的发送成功啊,这里是因为我们有十五条的一个消息啊,就等待了五秒把剩下的五条发完了。但是呢,他没有紧接着去进行一个消费,而是在零八秒的时候去进行一个消费,等待了一个十秒,因为这里的话他是不满足于我们配置的一个十个一批的规则,而只能等待了十秒钟之后再去进行一个消费, 消费的一个个数啊,是五个。好,那这就是关于这个批量消费的一个分享啊,有需要代码的小伙伴可以去主页找一下啊,今天的分享就到这里,我们下期再见。