RabbitMQ的学习和使用——交换机的概念

RabbitMQ的学习和使用——交换机的概念

Scroll Down

RabbitMQ的学习和使用——交换机的概念

1.exchange交换机

exchange:接受消息,并根据路由键转发消息所绑定的队列
属性:

Name:交换机名称
Type:交换机类型
	包括:direct、topic、fanout、headers
Durability:是否需要持久化,true为持久化
Auto Delete:当最后一个绑定到exchange上的队列删除后,自动删除该exchange
Internal:当前exchange是否用于RabbitMQ
Argument:扩展参数

2.Direct Exchange(直接模式 路由key相同)

含义:所有发送到Direct Exchange的消息被转发到RouterKey中指定的queue
注意:Direct模式可以使用RabbitMQ自带的exchange:default exchange,所以不需要将exchange进行任何绑定操作,消息传递,routekey必须完全匹配才会被队列接受,否则该消息会被抛弃,即生产者和消费者,具有相同的交换机名称(Exchange)、交换机类型和相同的密匙(routingKey),那么消费者即可成功获取到消息,当你的routingKey与队列名一致他也会匹配。
image.png
生产者:

package life.xp.rabbitmqdemo.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName: DirectProducer
 * @Description: 基本的生产者
 * @author: XP
 * @data: 2020/3/29
 */
public class DirectProducer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            String exchangeName="direct_exchange";
            String routingKey="direct.key";
            //4.通过channel发送消息
            for (int i = 0; i < 5; i++) {
                channel.basicPublish(exchangeName,routingKey,null,"消息发送direct模式".getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }




    }
}

消费者:

package life.xp.rabbitmqdemo.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName: DirectConsumer
 * @Description: 基本的消费者
 * @author: XP
 * @data: 2020/3/29
 */
public class DirectConsumer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            String exchangeName="direct_exchange";
            String exchangeType="direct";
            String queueName="direct_queue";
            String routingKey="direct.key";
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
            //声明队列
            channel.queueDeclare(queueName,false,false,false,null);
            //通过routingKey绑定交换机与队列
            channel.queueBind(queueName,exchangeName,routingKey);
            //4.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //队列名称 是否ack consumer
            channel.basicConsume(queueName,true,queueingConsumer);

            //5.消费消息
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("消息接受到:------>"+(new String(delivery.getBody())));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }
    }
}

exchange:
image.png
queue:
image.png
关系:
image.png

2.Topic Exchange

所有发送到Topic Exchange的消息被转发到所有关心的routekey中指定的topic的queue,Exchange将routekey和某topic模糊匹配(通配符,#匹配一个或多个词,*匹配不多不少的一个词),队列需要绑定一个topic,简单来说,队列与交换绑定的key是它需要的topic消息。
image.png
生产者:

package life.xp.rabbitmqdemo.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author xp
 * @Description topic生产者
 * @Date 10:13 2020/3/31
 * @Param 
 * @return 
 **/
public class TopicProducer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            String exchangeName="topic_exchange";
            String routingKey="doc.user";
            String routingKey1="doc.delete";
            String routingKey2="doc.add";
            String routingKey3="doc.update.message";

            //4.通过channel发送消息
            channel.basicPublish(exchangeName,routingKey,null,"消息发送topic模式,doc.user".getBytes());
            channel.basicPublish(exchangeName,routingKey1,null,"消息发送topic模式,doc.delete".getBytes());
            channel.basicPublish(exchangeName,routingKey2,null,"消息发送topic模式,doc.add".getBytes());
            channel.basicPublish(exchangeName,routingKey3,null,"消息发送topic模式,doc.update.message".getBytes());

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }




    }
}

消费者

package life.xp.rabbitmqdemo.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author xp
 * @Description topic消费者
 * @Date 10:11 2020/3/31
 * @Param
 * @return
 **/
public class TopicConsumer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            String exchangeName="topic_exchange";
            String exchangeType="topic";
            String queueName="topic_queue";
            String routingKey="doc.#";
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
            //声明队列
            channel.queueDeclare(queueName,false,false,false,null);
            //通过routingKey绑定交换机与队列
            channel.queueBind(queueName,exchangeName,routingKey);
            //4.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //队列名称 是否ack consumer
            channel.basicConsume(queueName,true,queueingConsumer);

            //5.消费消息
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("消息接受到:------>"+(new String(delivery.getBody())));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }
    }
}

image.png
image.png

3.Fanout Exchange

不处理路由键,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。性能最好。
image.png
生产者:

package life.xp.rabbitmqdemo.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author xp
 * @Description Fanout生产者
 * @Date 10:13 2020/3/31
 * @Param
 * @return
 **/
public class FanoutProducer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            String exchangeName="fanout_exchange";
            String routingKey="";

            //4.通过channel发送消息
            channel.basicPublish(exchangeName,routingKey,null,"消息发送fanout模式".getBytes());

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }




    }
}

消费者:

package life.xp.rabbitmqdemo.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author xp
 * @Description Fanout消费者
 * @Param
 * @return
 **/
public class FanoutConsumer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.创建连接
        Connection connection=null;
        Channel channel=null;
        try {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //3.创建通道
            channel = connection.createChannel();
            String exchangeName="fanout_exchange";
            String exchangeType="fanout";
            String queueName="fanout_queue";
            String routingKey="";
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
            //声明队列
            channel.queueDeclare(queueName,false,false,false,null);
            //通过routingKey绑定交换机与队列
            channel.queueBind(queueName,exchangeName,routingKey);
            //4.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //队列名称 是否ack consumer
            channel.basicConsume(queueName,true,queueingConsumer);

            //5.消费消息
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("消息接受到:------>"+(new String(delivery.getBody())));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }

        }
    }
}

image.png

4.消息的属性

消息分为properties与playload,其中properties有各种rabbitMQ定义的属性可以自己定义的属性放在headers中,playload就是消息的主要内容。

            AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder()
                                                    .deliveryMode(2)//是否持久化 1不持久化 2持久化
                                                    .contentEncoding("UTF-8")
                                                    .expiration("10000")//过期时间 10秒
                                                    .headers(map)//自定义属性
                                                    .build();
            //4.通过channel发送消息
            channel.basicPublish(exchangeName,routingKey,basicProperties,"消息发送fanout模式".getBytes());