警告
本文最后更新于 2020-09-11,文中内容可能已过时。
RabbitMQ
是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用。作为一名合格的开发者,有必要对RabbitMQ
有所了解。
安装
配置yum源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
vim /etc/yum.repos.d/rabbitmq-erlang.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/22/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1
|
下载版本:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/
安装:yum install rabbitmq-server-3.8.8-1.el6.noarch.rpm
yum安装:
yum install erlang
yum install rabbitmq-server-3.8.9
开启远程访问:rabbitmq-plugins enable rabbitmq_management
命令行操作
创建账号密码:rabbitmqctl add_user test passwd
设置该用户为administrator角色:rabbitmqctl set_user_tags test administrator
设置权限:rabbitmqctl set_permissions -p '/' test '.' '.' '.'
清空队列:rabbitmqctl purge_queue 队列
删除队列:rabbitmqctl delete_queue 队列
端口介绍
- 5672:编程语言客户端连接端口
- 15672:管理界面端口
- 25672:集群的端口
卸载
rpm -qa |grep rabbitmq
rpm -e rabbitmq-server
生产和消费(P2P)
生产者(producter):队列消息的产生者,负责生产消息,并将消息传入队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)
connection.close()
|
消费者(consumer):队列消息的接收者,负责 接收并处理 消息队列中的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'python-test', durable = False)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('python-test',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
|
持久化
MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。
- queue 声明持久化
1
2
|
# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
result = channel.queue_declare(queue = 'python-test',durable = True)
|
- exchange 声明持久化
1
2
|
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test', durable = True)
|
注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。
- 消息持久化
虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。
1
2
|
# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,properties=pika.BasicProperties(delivery_mode = 2))
|
- acknowledgement消息不丢失
消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给rabbitmq,重新再被消费者消费,这个时候需要设置确认标识。
1
2
|
channel.basic_consume(callback,queue = 'python-test',
# no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉no_ack = False)
|
发布与订阅(广播)
fanout
传递到exchange的消息将会转发到所有与其绑定的queue上
- 不需要指定routing_key,即使指定了也是无效。
- 需要提前将exchange和queue绑定,一个exchange可以绑定多个queue,一个queue可以绑定多个exchange。
- 需要先启动订阅者,此模式下的队列是consumer随机生成的,发布者仅仅发布消息到 exchange,由exchange转发消息至queue。
发布者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
channel.basic_publish(exchange = 'python-test',routing_key = '',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
print(message)
connection.close()
|
订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
# 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
auto_ack = False)
channel.start_consuming()
|
direct(组播)
消息发送至exchange,exchange根据路由键(routing_key)转发到相对应的queue上
- 可以使用默认exchange =’ ‘,也可以自定义exchange
- 这种模式下不需要将exchange和任何进行绑定,当然绑定也是可以的。可以将exchange和 queue ,routing_key和queue进行绑定
- 传递或接受消息时需要指定routing_key
- 需要先启动订阅者,此模式下的队列是consumer随机生成的,发布者仅仅发布消息到 exchange,由exchange转发消息至queue。
发布者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,
properties=pika.BasicProperties(delivery_mode = 2))
print(message)
connection.close()
|
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
# 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
#channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,用callback来接受消息
channel.basic_consume(result.method.queue,callback,
# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉auto_ack = False)
channel.start_consuming()
|
topicd
这种模式和第二种模式差不多,exchange也是通过路由键routing_key来转发消息到指定的queue。 不同点是routing_key使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将消息转发至所有routing_key 包含“orderid”字符的队列中。代码和模式二类似