1.RabbitMQ的学习和使用——基本概念与安装
1.什么是RabbitMQ?
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,erlang编写,基于AMQP协议。
高性能原因:erlang有着和原生socket一样的延迟。
1.AMQP协议
AMQP协议:是具有现代特征的二进制协议。是一个提供统一信息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向信息的中间层设计。
2.相关术语:
server:又称broker,接受客户端的连接,实现amqp实体服务。
connection:连接,应用程序与broker的网络连接。
channel:网络信道,数据的读写通道,客户端可以建立多个channel,每个channel就是一个会话。
message:消息,服务器和应用程序直接传送的数据,由properties和body。
virtual host:虚拟地址,用于逻辑隔离,最上层的消息路由。一个virtual host可以有若干个exchange和queue,同一个virtual host不能有同名的exchange和queue,业务的隔离。
exchange:交换机,接受消息,根据路由键转发到绑定的队列。
binding:exchange与queue之间的虚拟连接,可以包含多个routing key。
routing key:路由规则,虚拟机可以用它来确定如何路由一个特定消息。
queue:消息队列,保存消息,转发给消费者。
3.整体架构
生产者只关心消息的投递在哪个交换机上,而消费者只关心消息从哪个队列拿到,从而达到解耦。
4.消息流转过程
2.安装
准备环境:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel
下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
安装(按顺序执行):
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服务启动和停止:
启动 rabbitmq-server start &
查看 lsof -i:5672
停止 rabbitmqctl app_stop
管理插件:rabbitmq-plugins enable rabbitmq_management
访问地址:http://IP:15672/
3.相关命令
基本命令:
rabbitmqctl stop_app:关闭应用
rabbitmqctl start_app:启动应用
rabbitmqctl status:节点状态
rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后
用户命令:
rabbitmqctl add_user username password:添加用户
rabbitmqctl list_users:列出所有用户
rabbitmqctl delete_user username:删除用户
rabbitmqctl clear_premissons -p vhostpath username:清除用户权限
rabbitmqctl list_user_permissions username:列出用户权限
rabbitmqctl change_password username newpassword:修改密码
rabbitmqctl set_permissions -p vhostpath username ".*.*.*":设置用户权限
虚拟主机命令:
rabbitmqctl add_vhost vhostpath:创建虚拟主机
rabbitmqctl list-vhosts:列出所有虚拟主机
rabbitmqctl list_permissons -p vhostpath:列出虚拟主机所有权限
vhostpath delete_vhost vhostpath:删除虚拟主机
集群命令
rabbitmqctl join_cluster <clusternode> [--ram]:组成集群命令
rabbitmqctl cluster_status:查看集群状态
rabbitmqctl change_cluster_node_type disc |ram:修改集群节点的存储形式
rabbitmqctl forget_cluster_node [--offline]:忘记节点
rabbitmqctl rename_cluster_node oldnode1 newnodew1 [oldnode1] [newnodew1]
4.基本demo
依赖:
生产者:
package life.xp.rabbitmqdemo.basic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @ClassName: RabbitProducer
* @Description: 基本的生产者
* @author: XP
* @data: 2020/3/29
*/
public class RabbitProducer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2.创建连接
Connection connection=null;
Channel channel=null;
try {
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
//3.创建通道
channel = connection.createChannel();
//4.通过channel发送消息
for (int i = 0; i < 5; i++) {
channel.basicPublish("","test01",null,"消息发送".getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
package life.xp.rabbitmqdemo.basic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @ClassName: RabbitConsumer
* @Description: 基本的消费者
* @author: XP
* @data: 2020/3/29
*/
public class RabbitConsumer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2.创建连接
Connection connection=null;
Channel channel=null;
try {
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
//3.创建通道
channel = connection.createChannel();
//4.定义队列名称 持久队列 独占队列 自动删除队列 参数
channel.queueDeclare("test01",false, false, false,null);
//5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("test01",true,queueingConsumer);
//5.消费消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("消息接受到:------>"+(new String(delivery.getBody())));
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}