exchange服务器之RabbitMQ(1) 三种exchange总结
白羽 2018-12-11 来源 :网络 阅读 885 评论 0

摘要:本文将带你了解exchange服务器之RabbitMQ(1) 三种exchange总结,希望本文对大家学Exchange有所帮助。

    本文将带你了解exchange服务器之RabbitMQ(1) 三种exchange总结,希望本文对大家学Exchange有所帮助。


1.AMQP

2.几个基本概念

3.三种 exchange

4.思考总结



……1.AMQP……


        在AMQP(高级消息队列协议)协议中,queue、exchange、binding构成了协议的核心。exchange是一个重要的组件,那么如何理解exchange呢?刚刚开始学习消息队列的时候,想起作品展时候做的一个基于SMTP协议的邮箱。不同的是MQ中没有发送者和接受者的概念,也不是客户端服务器的概念。MQ专注于应用程序之间的消息通信,是一种生产者和消费者之间的关系。exchange是producer和consumer之间消息交换机。




……2.几个基本概念……


        在介绍exchange类型之前,需要了解几个基本的概念。

    Broker:消息队列服务器实体
    exchange:消息交换机,指定消息规则,处理消息和队列之间的关系
    queue:队列载体,消息投入队列中
    binding:绑定,把exchange和queue按照路由规则绑定起来
    Routing Key:路由关键字。exchange根据这个进行消息投递
    vhost:虚拟消息服务器,每个RabbitMQ服务器都能够创建虚拟消息服务器。
    producer:生产者,投递消息的程序
    consumer:消费者,接受消息的程序
    channel:信道(重要概念),打开信道才能进行通信,一个channel代码一个会话任务。



……3.三种exchange……

        

           vhost通过创建channel连接程序,会根据routingKey将M从exchange到queue,那根据什么规则进行投递消息?如果有好多不同类型的M,那么应该如何选择?AMQP中定义了不同类型的exchange就可以发挥作用了。一种三种类型:fanout,topic,direct。每一种类型实现不同的路由算法。



3.1 Fanout Exchange 不规则路由


         将收到的消息广播到绑定的队列上。当你发送一条消息到fanout exchange上,它会把消息投递给所有附加到此交换机上面的队列。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。


生产者(提供服务的系统):

    package cn.itcast.rabbitmq.ps;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
     
    public class Send {
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 消息内容
            String message = "商品已经新增,id = 1000";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        }
    }


消费者(调用服务的系统):

    package cn.itcast.rabbitmq.ps;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
     
    public class Recv {
     
        private final static String QUEUE_NAME = "test_queue_fanout_1";
     
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
     
        public static void main(String[] argv) throws Exception {
     
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
     
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
     
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" 前台系统: '" + message + "'");
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }



3.2 Direct Exchange 处理路由


     生产者在发送消息的时候都需要指定一个routingkey和exchange,exchange在接到rooutingkey的时候与该exchange关联的所有binding中的bindingkey进行比较,如果相等,则发送到binding对应的queue中。


生产者(提供服务的系统):

    package cn.itcast.rabbitmq.routing;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
     
    public class Send {
     
        private final static String EXCHANGE_NAME = "test_exchange_direct";
     
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
     
            // 消息内容
            String message = "删除商品, id = 1001";
            channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
     
            channel.close();
            connection.close();
        }
    }


消费者(调用服务的系统):

    package cn.itcast.rabbitmq.routing;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
     
    public class Recv {
     
        private final static String QUEUE_NAME = "test_queue_direct_1";
     
        private final static String EXCHANGE_NAME = "test_exchange_direct";
     
        public static void main(String[] argv) throws Exception {
     
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
     
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
     
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" 前台系统: '" + message + "'");
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }



3.3 Topic Exchange 通配符路由


         将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。这个很像Javascript中的正则表达式。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。


生产者(提供服务的系统):

    package cn.itcast.rabbitmq.topic;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
     
    public class Send {
     
        private final static String EXCHANGE_NAME = "test_exchange_topic";
     
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
     
            // 消息内容
            String message = "删除商品,id = 1001";
            channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
     
            channel.close();
            connection.close();
        }
    }



消费者(调用服务的系统):

消费者1:

    package cn.itcast.rabbitmq.topic;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
     
    public class Recv {
     
        private final static String QUEUE_NAME = "test_queue_topic_1";
     
        private final static String EXCHANGE_NAME = "test_exchange_topic";
     
        public static void main(String[] argv) throws Exception {
     
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
     
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
     
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" 前台系统: '" + message + "'");
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }


消费者2:

    package cn.itcast.rabbitmq.topic;
     
    import cn.itcast.rabbitmq.util.ConnectionUtil;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
     
    public class Recv2 {
     
        private final static String QUEUE_NAME = "test_queue_topic_2";
     
        private final static String EXCHANGE_NAME = "test_exchange_topic";
     
        public static void main(String[] argv) throws Exception {
     
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
     
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
     
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" 搜索系统: '" + message + "'");
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }



本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标系统运维之Exchange频道!


本文由 @白羽 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程