(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

2017-12-30 11:37:59来源:oschina作者:残刃O人点击

分享

文是简单介绍一下RabbitMQ,参考官网上的教程。同时加入了一些自己的理解。官网教程详见:"Hello World!"。


引言

你是否遇到过多个系统间需要通过定时任务来同步某些数据?


你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?


如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题。


本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一。


RabbitMQ简介

MQ(Message Queue,消息队列)是一种应用系统之间的通信方法。是通过读写出入队列的消息来通信(RPC则是通过直接调用彼此来通信的)。


AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。


AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。


RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。


下面通过生产者代码来解释一下RabbitMQ中涉及到的概念。


Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it. public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
//RabbitMQ所在主机ip或者主机名
factory.setHost("192.168.20.87");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//指定一个队列 第二个参数代表在服务器重启时是否能够存活,
//第三个参数:是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
//当没有任何消费者使用时,自动删除该队列。
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "hello world ! 刃";
//发送一条消息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println(" [x] Sent '" + msg + "'");
channel.close();
connection.close();
}
ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel,这三个都是RabbitMQ对外提供的API中最基本的对象。不管是服务器端还是客户端都会首先创建这三类对象。ConnectionFactory为Connection的制造工厂。


Connection是与RabbitMQ服务器的socket链接,它封装了socket协议及身份验证相关部分逻辑。


Channel是我们与RabbitMQ打交道的最重要的一个接口,大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。


Queue

Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。



RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。



队列是有Channel声明的,而且这个操作是幂等的。同名的队列多次声明也只会创建一次。我们发送消息就是想这个声明的队列里发送消息。


看一下消费者的代码:


public class MsgReceiver {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
//RabbitMQ所在主机ip或者主机名
factory.setHost("192.168.20.87");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
//指定一个队列 第二个参数代表在服务器重启时是否能够存活,
//第三个参数:是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
//当没有任何消费者使用时,自动删除该队列。
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 指定消费队列
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
// nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答

从上述代码中,我们可以看到ConnectionFactory、Connection、Channel这三个对象都还是会创建。而队列在消费者这里又声明了一遍。这是为了防止先启动消费者,当为消费者指定队列时,如果RabbitMQ服务器上未声明过队列,就会抛出IO异常。


QueueingConsumer

队列消费者,用于监听队列中的消息。调用nextDelivery方法时,内部实现就是调用队列的take方法。该方法的作用:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。说白了就是如果没有消息,就处于阻塞状态。


运行结果如下:(生产者、消费者谁先运行都可以)


============================================================

RabbitMQ应用架构


从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收。


RabbitMQ消息队列基本概念

RabbitMQ Server:也叫broker server,它是一种传输服务。 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。


Producer:消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服务器然后将消息投递到Exchange。


Consumer:消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。


Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有fanout、direct、topic、headers四种类型,每种类型对应不同的路由规则,后面详细介绍这四种类型。


Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。


RoutingKey:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。


Connection:(连接)。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。


Channels:(信道)。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。


Exchange Types:

fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 生产者发送到Exchange的所有消息都会路由到绑定的Queue,并最终被两个消费者消费。

direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。(在实际使用RabbitMQ的过程中并没有binding key这个参数,只有routing key,为了区分我们把交换机和队列绑定时传的参数叫binding key,把发送消息时带的这个参数叫routing key)

topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但direct是完全匹配,而通过topic可以进行模糊匹配



routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串 binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)


headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。(实际中我并有用过)

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台