RabbitMQ概念和应用详解
RabbitMQ概述
RabbitMQ可以做什么?
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,可用于在分布式系统中存储转发消息,主要有以下的技术亮点:
- 可靠性
- 灵活的路由
- 集群部署
- 高可用的队列消息
- 可视化的管理工具
RabbitMQ主要用于系统间的双向解耦,当生产者(productor)产生大量的数据时,消费者(consumer)无法快速的消费信息,那么就需要一个类似于中间件的代理服务器,用来处理和保存这些数据,RabbitMQ就扮演了这个角色。
如何使用RabbitMQ
- Erlang语言包
- RabbitMQ安装包
基本概念
1.Broker
用来处理数据的消息队列服务器实体
2.虚拟主机(vhost)
由RabbitMQ服务器创建的虚拟消息主机,拥有自己的权限机制,一个broker里可以开设多个vhost,用于不同用户的权限隔离,vhost之间是也完全隔离的。
3.生产者(productor)
产生用于消息通信的数据
4.信道(channel)
消息通道,在AMQP中可以建立多个channel,每个channel代表一个会话任务。
5.交换机(exchange)
(1)接受消息,转发消息到绑定的队列,总共有四种类型的交换器:direct,fanout,topic,headers。
- direct:转发消息到routing-key指定的队列
fanout:转发消息到所有绑定的队列,类似于一种广播发送的方式。
topic:按照规则转发消息,这种规则多为模式匹配,也显得更加灵活
(2).交换器在RabbitMQ中是一个存在的实体,不能改变,如有需要只能删除重建。
(3).topic类型的交换器利用匹配规则分析消息的routing-key属性。
(4).属性
- 持久性:声明时durable属性为true
- 自动删除:绑定的queue删除也跟着删除
- 惰性:不会自动创建
6.队列(queue)
(1).队列是RabbitMQ的内部对象,存储消息
(2).可以动态的增加消费者,队列将接受到的消息以轮询(round-robin)的方式均匀的分配给多个消费者
(3).队列的属性
- 持久性:如果启用,队列将会在server重启之前有效
- 自动删除:消费者停止使用之后就会自动删除
- 惰性:不会自动创建
- 排他性:如果启用,队列只能被声明它的消费者使用。
7.两个key
- routing-key:消息不能直接发到queues,需要先发送到exchanges,routing-key指定queues名称,exchanges通过routing-key来识别与之绑定的queues
1 2 3 |
|
- binding-key:主要是用来表示exchanges和queues之间的关系,为了区别queue_publish的routing-key,就称作binding-key。
1 2 3 |
|
8.绑定(binding)
表示交换机和队列之间的关系,在进行绑定时,带有一个额外的参数binding-key,来和routing-key相匹配。
9.消费者(consumer)
监听消息队列来进行消息数据的读取
10.高可用性(HA)
(1).在consumer处理完消息后,会发送消息ACK,通知通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK,则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
1
|
|
(2).消息和队列的持久化
(3).镜像队列,实现不同节点之间的元数据和消息同步
RabbitMQ在OpenStack中的应用
RPC之neutron专题
基于RabbitMQ的RPC消息通信是neutron中跨模块进行方法调用的很重要的一种方式,根据上面的描述,要组成一个完整的RPC通信结构,需要信息的生产者和消费者。
- client端:用于产生rpc消息。
- server端:用于监听消息数据并进行相应的处理。
1.neutron-agent中的RPC
在dhcp_agent、l3_agent、metadata_agent,metering_agent的main函数中都存在一段创建一个rpc服务端的代码,下面以dhcp_agent为例。
1 2 3 4 5 6 7 8 9 10 |
|
最核心的,也是跟rpc相关的部分包括两部分,首先是创建rpc服务端。
1 2 3 4 5 |
|
该代码实际上创建了一个rpc服务端,监听指定的topic并运行manager上的tasks。
create()方法返回一个neutron.service.Service对象,neutron.service.Service继承自neutron.common.rpc.Service类。
首先看neutron.common.rpc.Service类,该类定义了start方法,该方法主要完成两件事情:一件事情是将manager添加到endpoints中;一件是创建rpc的consumer,分别监听topic的队列消息。
而在neutron.service.Service类中,初始化中生成了一个manager实例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport);并为start方法添加了周期性执行report_state方法和periodic_tasks方法。report_state方法没有具体实现,periodic_tasks方法则调用manager的periodic_tasks方法。
manager实例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的时候首先创建一个rpc的client端,通过代码
1
|
|
该client端实际上定义了report_state方法,可以状态以rpc消息的方式发送给plugin。
manager在初始化后,还会指定周期性运行_report_state方法,实际上就是调用client端的report_state方法。
至此,对rpc服务端的创建算是完成了,之后执行代码。
1
|
|
service.launch(server)方法首先会将server放到协程组中,并调用server的start方法来启动server。
2.neutron-plugin中的RPC
主要对ML2Plugin进行分析,包括两个类:RpcCallbacks和AgentNotifierApi。
RpcCallbacks:负责当agent往plugin发出rpc请求时候,plugin实现请求的相关动作,除了继承自父类(dhcp rpc、dvr rpc、sg_db rpc和tunnel rpc)中的方法,还包括get_port_from_device、get_device_details、get_devices_details_list、update_device_down、update_device_up、get_dvr_mac_address_by_host、get_compute_ports_on_host_by_subnet、get_subnet_for_dvr等方法。
AgentNotifierApi:负责当plugin往agent发出rpc请求(plugin通知agent)的时候,plugin端的方法。
1 2 3 4 5 6 7 |
|
创建一个通知rpc的客户端,用于向OVS的agent发出通知。所有plugin都需要有这样一个发出通知消息的客户端,创建了一个OVS agent的通知rpc客户端。之后,创建两个跟service agent相关的consumer,分别监听topics.PLUGIN
ovs_neutron_agent也会创建RPC的consumer,用来监听topics.UPDATE、topics.DELETE等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
3.neutron-server中的RPC
这个rpc服务端主要通过neutron.server中主函数中代码执行
1
|
|
方法的实现代码(目录:neutron/neutron/service.py)如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
其中,RpcWorker(plugin)主要通过调用plugin的方法来创建rpc服务端,最重要的工作是调用plugin的start_rpc_listeners来监听消息队列:
1
|
|
该方法在大多数plugin中并未被实现,目前ml2支持该方法。
在neutron.plugin.ml2.plugin.ML2Plugin类中,该方法创建了一个topic为topics.PLUGIN的消费rpc。
1 2 3 4 5 6 7 8 |
|
RPC之nova专题
在Openstack中,每一个Nova服务初始化时会创建两个队列,一个名为“NODE-TYPE.NODE-ID”,另一个名为“NODE-TYPE”,NODE-TYPE是指服务的类型,NODE-ID指节点名称。
1.nova中实现exchange的种类
- direct:初始化中,各个模块对每一条系统消息自动生成多个队列放入RabbitMQ服务器中,队列中绑定的binding-key要与routing-key匹配
- topic:各个模块也会自动生成两个队列放入RabbitMQ服务器中。
2.nova中调用RPC的方式
- RPC.CALL:用于请求和响应方式
- RPC.CAST:只是提供单向请求
3.nova中模块的逻辑功能
- Invoker:向消息队列中发送系统请求信息,如Nova-API和Nova-Scheduler,通过RPC.CALL和RPC.CAST两个进程发送系统请求消息。
- Worker:从消息队列中获取Invoker模块发送的系统请求消息以及向Invoker模块回复系统响应消息,如Nova-Compute、Nova-Volume和Nova-Network,对RPC.CALL做出响应。
4.nova中的exchange domain
direct exchange domain: Topic消息生产者(Nova-API或者Nova-Scheduler)与Topic交换器生成逻辑连接,通过PRC.CALL或者RPC.CAST进程将系统请求消息发往Topic交换器。交换器根据不同的routing-key将系统请求消息转发到不同的类型的消息队列。Topic消息消费者探测到新消息已进入响应队列,立即从队列中接收消息并调用执行系统消息所请求的应用程序。
点到点消息队列:Topic消息消费者应用程序接收RPC.CALL的远程调用请求,并在执行相关计算任务之后将结果以系统响应消息的方式通过Direct交换器反馈给Direct消息消费者。
共享消息队列:Topic消息消费者应用程序只是接收RPC.CAST的远程调用请求来执行相关的计算任务,并没有响应消息反馈。
topic exchange domain: Direct交换域并不是独立运作,而是受限于Topic交换域中RPC.CALL的远程调用流程与结果,每一个RPC.CALL激活一次Direct消息交换的运作。
以nova启动虚拟机的过程为例,详细介绍RPC通信过程。
RPC.CAST缺少了系统消息响应流程。一个Topic消息生产者发送系统请求消息到Topic交换器,Topic交换器根据消息的Routing Key将消息转发至共享消息队列,与共享消息队列相连的所有Topic消费者接收该系统请求消息,并把它传递给响应的Worker进行处理,其调用流程如图所示: