RabbitMQ的学习和使用——基本概念与安装

RabbitMQ的学习和使用——基本概念与安装

Scroll Down

1.RabbitMQ的学习和使用——基本概念与安装

1.什么是RabbitMQ?

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,erlang编写,基于AMQP协议。
高性能原因:erlang有着和原生socket一样的延迟。

1.AMQP协议

AMQP协议:是具有现代特征的二进制协议。是一个提供统一信息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向信息的中间层设计。
image.png

2.相关术语:

server:又称broker,接受客户端的连接,实现amqp实体服务。
connection:连接,应用程序与broker的网络连接。
channel:网络信道,数据的读写通道,客户端可以建立多个channel,每个channel就是一个会话。
message:消息,服务器和应用程序直接传送的数据,由properties和body。
virtual host:虚拟地址,用于逻辑隔离,最上层的消息路由。一个virtual host可以有若干个exchange和queue,同一个virtual host不能有同名的exchange和queue,业务的隔离。
exchange:交换机,接受消息,根据路由键转发到绑定的队列。
binding:exchange与queue之间的虚拟连接,可以包含多个routing key。
routing key:路由规则,虚拟机可以用它来确定如何路由一个特定消息。
queue:消息队列,保存消息,转发给消费者。

3.整体架构

image.png
生产者只关心消息的投递在哪个交换机上,而消费者只关心消息从哪个队列拿到,从而达到解耦。

4.消息流转过程

image.png

2.安装

准备环境:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel 
下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
安装(按顺序执行):
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服务启动和停止:
启动 rabbitmq-server start &
查看 lsof -i:5672
停止 rabbitmqctl app_stop
管理插件:rabbitmq-plugins enable rabbitmq_management
访问地址:http://IP:15672/

3.相关命令

基本命令:
rabbitmqctl stop_app:关闭应用
rabbitmqctl start_app:启动应用
rabbitmqctl status:节点状态
rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后
用户命令:
rabbitmqctl add_user username password:添加用户
rabbitmqctl list_users:列出所有用户
rabbitmqctl delete_user username:删除用户
rabbitmqctl clear_premissons -p vhostpath username:清除用户权限
rabbitmqctl list_user_permissions username:列出用户权限
rabbitmqctl change_password username newpassword:修改密码
rabbitmqctl set_permissions -p vhostpath username ".*.*.*":设置用户权限
虚拟主机命令:
rabbitmqctl add_vhost vhostpath:创建虚拟主机
rabbitmqctl list-vhosts:列出所有虚拟主机
rabbitmqctl list_permissons -p vhostpath:列出虚拟主机所有权限
vhostpath delete_vhost vhostpath:删除虚拟主机
集群命令
rabbitmqctl join_cluster <clusternode> [--ram]:组成集群命令
rabbitmqctl cluster_status:查看集群状态
rabbitmqctl change_cluster_node_type disc |ram:修改集群节点的存储形式
rabbitmqctl forget_cluster_node [--offline]:忘记节点
rabbitmqctl rename_cluster_node oldnode1 newnodew1 [oldnode1] [newnodew1]

4.基本demo

依赖:


生产者:

package life.xp.rabbitmqdemo.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName: RabbitProducer
 * @Description: 基本的生产者
 * @author: XP
 * @data: 2020/3/29
 */
public class RabbitProducer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
  	    factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            //4.通过channel发送消息
            for (int i = 0; i < 5; i++) {
                channel.basicPublish("","test01",null,"消息发送".getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }




    }
}

image.png

package life.xp.rabbitmqdemo.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName: RabbitConsumer
 * @Description: 基本的消费者
 * @author: XP
 * @data: 2020/3/29
 */
public class RabbitConsumer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            //4.定义队列名称  持久队列 独占队列 自动删除队列 参数
            channel.queueDeclare("test01",false, false, false,null);
            //5.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume("test01",true,queueingConsumer);

            //5.消费消息
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("消息接受到:------>"+(new String(delivery.getBody())));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }
    }
}

image.png
image.png