本文从总体上介绍RabbitMq及其核心概念,是入门必备的参考资料。

1、AMQP协议

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

2、RabbitMq核心概念

Broker

简单来说就是消息队列服务器实体。

producer

消息生产者,就是投递消息的程序。

consumer

消息消费者,就是接受消息的程序。

vhost

虚拟主机,一个broker里可以开设多个vhost,用作权限分离,把不同的系统使用的rabbitmq区分开,共用一个消息队列服务器,但看上去就像各自在用不用的rabbitmq服务器一样。

Connection

一个网络连接,比如TCP/IP套接字连接。

channel

消息通道,是建立在真实的TCP连接内的虚拟连接(是我们与RabbitMQ打交道的最重要的一个接口)。仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的,需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。AMQP的命令都是通过信道发送出去的(我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。)。每条信道都会被指派一个唯一ID。在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务,理论上无限制,减少TCP创建和销毁的开销,实现共用TCP的效果。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。

注1:一个生产者或一个消费者与MQ服务器之间只有一条TCP连接

注2:RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

Exchange

消息交换机,生产者不是直接将消息投递到Queue中的,实际上是生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),之后会分别进行介绍。

Queue

消息队列载体,每个消息都会被投入到一个或多个队列。

Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

Binding

绑定,它的作用就是把exchange和queue按照路由规则绑定起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

Routing Key

路由关键字,生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

Prefetch count

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

3、消息队列的使用过程

在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。所以生产者想要发送消息,首先必须要声明一个Exchange和该Exchange对应的Binding。

在Rabbit MQ中,声明一个Exchange需要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在创建Binding和生产者通过publish推送消息时需要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。

建立一个Exchange

声明一个Binding

声明一个Binding需要提供一个QueueName,ExchangeName和BindingKey。

消息发送的过程

  1. 建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。
  2. 建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
  3. 发送消息。由Producer发送消息到Broker中的Exchange中。
  4. 路由转发。生产者Producer在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange收到消息后可以看到消息中指定的RoutingKey,再根据当前Exchange的ExchangeType,按一定的规则将消息转发到相应的queue中去。
  5. 消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
  6. 消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。

4、exchange 与 Queue 的路由机制

Exchange根据消息的Routing Key和Exchange绑定Queue的Binding Key分配消息。生产者在将消息发送给Exchange的时候,一般会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key需要与Exchange Type及Binding Key联合使用才能最终生效。 在Exchange Type与Binding Key固定的情况下(一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定Routing Key来决定消息流向哪里。

exchange 将消息发送到哪一个queue是由exchange type 和bing 规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。Direct exchange 直接转发路由,其实现原理是通过消息中的routkey,与queue 中的routkey 进行比对,若二者匹配,则将消息发送到这个消息队列。通常使用这个。

Direct exchange 路由

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

Fanout exchange 复制分发路由

该路由不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列。

上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

topic exchange 通配路由

topic exchange 通配路由,是direct exchange的通配符模式,消息中的routkey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配表达式的queue。

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

需要注意的一点只有queue具有 保持消息的功能,exchange不能保存消息。

headers 路由

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

5、RPC

MQ本身是基于异步的消息处理,生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败,甚至连有没有消费者来处理这条消息都不知道。但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。

RabbitMQ中实现RPC的机制是: 一. 生产者发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14个属性,这些属性会随着消息一起发送)中设置两个属性值replyTo(一个Queue名称,用于告诉消费者处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,消费者处理完成后需要将此属性返还,生产者将根据这个id了解哪条请求被成功执行了或执行失败)。 二. 消费者收到消息并处理。 三. 消费者处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。 四. 生产者之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

6、消息确认:Message acknowledgment

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。 如果我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ会立即把这个Message标记为完成,然后从queue中删除了。

7、durability 持久化与非持久化队列

如上图,在Features字段里有一个D,就是持久化队列,英文durable(持久的)。

持久化队列会被保存在磁盘中,固定并持久的存储,当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理,而非持久化队列不会被保存在磁盘中,Rabbit服务重启后队列就会消失。

如果需要队列的完整性,数据在队列中的保存是必须不允许丢失的,那么可以使用持久化。而当需要获取的信息是实时的,或者是随机的信息,不需要信息的精确性或完整性,但是追求获取性能,可以选择非持久化队列。

8、分发机制

我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据。因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里。这时,就得使用工作队列了。一个队列有多个消费者同时消费数据。 工作队列有两种分发数据的方式:轮询分发(Round-robin)和 公平分发(Fair dispatch)。轮询分发:队列给每一个消费者发送数量一样的数据。公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。

①.轮询分发:Round-robin dispatching

如果工作队列中有两个消费者,两个消费者得到的数据量一样的,并不会因为两个消费者处理数据速度不一样使得两个消费者取得不一样数量的数据。但是这种分发方式存在着一些隐患,消费者虽然得到了消息,但是如果消费者没能成功处理业务逻辑,在RabbitMQ中也不存在这条消息。就会出现消息丢失并且业务逻辑没能成功处理的情况。

②.公平分发:Fair dispatch

消费者设置每次从队列里取一条数据,并且关闭自动回复机制,每次取完一条数据后,手动回复并继续取下一条数据。与轮询分发不同的是,当每个消费都设置了每次只会从队列取一条数据时,并且关闭自动应答,在每次处理完数据后手动给队列发送确认收到数据。这样队列就会公平给每个消息费者发送数据,消费一条再发第二条,而且可以在管理界面中看到数据是一条条随着消费者消费完从而减少的,并不是一下子全部分发完了。采用公平分发方式就不会出现消息丢失并且业务逻辑没能成功处理的情况。

9、事务

对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

10、Confirm机制

使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

11、Alternate Exchange(代替交换器)

Alternate Exchange是Rabbitmq自己扩展的功能,不是AMQP协议定义的。 创建Exchange指定该Exchange的Alternate Exchange,发送消息的时候如果Exchange没有成功把消息路由到队列中去,这就会将此消息路由到Alternate Exchange属性指定的Exchange上了。需要在创建Exchange时添加alternate-exchange属性。如果Alternate Exchange也没能成功把消息路由到队列中去,这个消息就会丢失。可以触发publish confirm机制,表示这个消息没有确认。 创建交换器时需要指定如下属性

Map<String,Object> argsMap = new HashMap<>(); 

argsMap.put(“alternate-exchange”,“Alternate Exchange Name”);

12、TTL(生存时间)

RabbitMQ允许您为消息和队列设置TTL(生存时间)。 可以使用可选的队列参数或策略完成(推荐使用后一个选项)。 可以为单个队列,一组队列或单个消息应用消息TTL。

设置消息的过期时间

MessageProperties messageProperties = new MessageProperties(); 

messageProperties.setExpiration(“30000”);

设置队列中消息的过期时间 在声明一个队列时,可以指定队列中消息的过期时间,需要添加x-message-ttl属性。

Map<String, Object> arguments = new HashMap<>(); 

arguments.put(“x-message-ttl”,30000); 

如果同时制定了Message TTL,Queue TTL,则时间短的生效。

13、Queue Length Limit(队列长度限制)

可以设置队列中消息数量的限制,如果测试队列中最多只有5个消息,当第六条消息发送过来的时候,会删除最早的那条消息。队列中永远只有5条消息。 使用代码声明含有x-max-length和x-max-length-bytes属性的队列 Max length(x-max-length) 用来控制队列中消息的数量。 如果超出数量,则先到达的消息将会被删除掉。

Max length bytes(x-max-length-bytes) 用来控制队列中消息总的大小。 如果超过总大小,则最先到达的消息将会被删除,直到总大小不超过x-max-length-byte为止。

Map<String, Object> arguments = new HashMap<>(); //表示队列中最多存放三条消息 

arguments.put(“x-max-length”,3); 

Map<String, Object> arguments = new HashMap<>(); //队列中消息总的空间大小 

arguments.put(“x-max-length-bytes”,10);

14、Dead Letter Exchange(死信交换器)

在队列上指定一个Exchange,则在该队列上发生如下情况, 1.消息被拒绝(basic.reject or basic.nack),且requeue=false 2.消息过期而被删除(TTL) 3.消息数量超过队列最大限制而被删除 4.消息总大小超过队列最大限制而被删除

就会把该消息转发到指定的这个exchange 需要定义了x-dead-letter-exchange属性,同时也可以指定一个可选的x-dead-letter-routing-key,表示默认的routing-key,如果没有指定,则使用消息原来的routeing-key进行转发

当定义队列时指定了x-dead-letter-exchange(x-dead-letter-routing-key视情况而定),并且消费端执行拒绝策略的时候将消息路由到指定的Exchange中去。 我们知道还有二种情况会造成消息转发到死信队列。 一种是消息过期而被删除,可以使用这个方式使的rabbitmq实现延迟队列的作用。还有一种就是消息数量超过队列最大限制而被删除或者消息总大小超过队列最大限制而被删除

15、priority queue(优先级队列)

声明队列时需要指定x-max-priority属性,并设置一个优先级数值

消息优先级属性

MessageProperties messageProperties = new MessageProperties(); 

messageProperties.setPriority(priority);

如果设置的优先级小于等于队列设置的x-max-priority属性,优先级有效。 如果设置的优先级大于队列设置的x-max-priority属性,则优先级失效。

创建优先级队列,需要增加x-max-priority参数,指定一个数字。表示最大的优先级,建议优先级设置为1~10之间。 发送消息的时候,需要设置priority属性,最好不要超过上面指定的最大的优先级。 如果生产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进行消费。 第一,如果发送的消息的优先级属性小于设置的队列属性x-max-priority值,则按优先级的高低进行消费,数字越高则优先级越高。 第二,如果发送的消息的优先级属性都大于设置的队列属性x-max-priority值,则设置的优先级失效,按照入队列的顺序进行消费。 第三,如果消费端一直进行监听,而发送端一条条的发送消息,优先级属性也会失效。

16、延迟队列

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

①延迟消费。

比如: 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

②延迟重试。

比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

我们可以利用RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。实现延迟队列。

Time-To-Live Extensions RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

Dead Letter Exchange 刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式: 1.消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。 2.消息因为设置了TTL而过期。 3.消息进入了一条已经达到最大长度的队列。 如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

更多推荐

RabbitMq面试必备基础知识