Exchange服务器之七.消息的交换:topic类型的Exchange(消费通过#或者*通配提供者的消息)
白羽 2018-07-26 来源 :网络 阅读 957 评论 0

摘要:本文将带你了解Exchange服务器之七.消息的交换:topic类型的Exchange(消费通过#或者*通配提供者的消息),希望本文对大家学Exchange有所帮助

topic exchange对routingKey是有要求的,必须是一个关键字的列表才能发挥正常作用,用“.”分割每个关键字,你可以定义任意的层级,唯一的限制是最大长度为255字节。 
topic exchange的通配符使用是在绑定的时候使用的,发消息的时候不起作用。 
topic的两个关键字: 
1.“”星号,代表一个词,比如上述规则:.error 表示所有系统的error级别的日志 
例:one.*  
匹配如下: 
one.two 
one.three 
one.four 
不匹配: 
one.two.a 
one.two.three.four 
2.“#”井号,代表零个或多个词,比如上述规则: .# 表示所有系统的所有消息,与单独一个#是等效的,core.# 表示核心系统的所有日志,它和 core. 的区别是,即使以后规则改为 <系统>.<日志级别>.<其他条件>.<其他条件>.……,core.# 依然可以完成匹配,而 core.* 则无法匹配 core.info.xxx.xxx 
例:one.*  
匹配: 
one 
one.two 
one.two.three 
不匹配: 
two 
two.one…

消费者和提供者的pom.xml和上一章一样

一.提供者Producer 
1.Exchange为topic类型的消息发送者:LogSenderTopic.java

package com.rabbit.exchange;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class LogSenderTopic {


    private Logger logger = LoggerFactory.getLogger(LogSenderFanout.class);

    //ConnectionFactory和Connection在正式开发时需要设置成单例
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Channel channel;

    /**
     * 在构造函数中获取连接
     */
    public LogSenderTopic(){
        super();
        try {
            connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (Exception e) {
            logger.error("获取连接时出错...");
        }
    }

    /**
     * 关闭连接的方法
     */
    public boolean closeAll(){
        try {
            this.channel.close();
            this.connection.close();
        } catch (Exception e) {
            logger.error("关闭连接时异常...");
            return false;
        }
        return true;
    }

    /**
     * 发送消息到交换中心
     */
    public void sendMessage(String message,String routingKey){
        try {
            //声明一个exchange,名字为logs,类型为fanout
            channel.exchangeDeclare("logs", "topic");
            //发布消息到exchange上
            /**
             * 1.指定exchange的名字
             * 2.direct类型
             * 3.null...
             * 3.发送的消息
             */
            channel.basicPublish("logs", routingKey, null, message.getBytes());
            logger.debug("发送direct类型的消息"+message+"到exchange交换中心.");
        } catch (Exception e) {
            logger.error("消息发送失败:"+e);
        }
    }

}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071

测试main方法:ExchangeTopicMain.java



package com.rabbit.main;

import com.rabbit.exchange.LogSenderTopic;

public class ExchangeTopicMain {

    public static void main(String[] args) throws InterruptedException {
        LogSenderTopic logSender = new LogSenderTopic();
        //轮流每一秒发送info和error的消息(让消费者1接受info和error级别消息,消费者2只接受info级别消息)
        while (true) {
            //匹配结果:
            //core.*:可以       core.#:可以
            //logSender.sendMessage("hello tiglle:core.info","core.info");
            //core.*:不行       core.#:可以
            logSender.sendMessage("hello tiglle:core.member.info","core.member.info");
            //注:生产者不能使用通配符,生产者的通配符定义会被当成普通字符文本
            //logSender.sendMessage("hello tiglle:core.#","core.#");//core.#将会被解析成普通文本
            Thread.sleep(1000);
        }
    }

}
1234567891011121314151617181920212223

二.消费者1(使用#多个单词匹配) 
1.接收消息的类LogReceiveTopic.java



package com.rabbit.exchange;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class LogReceiveTopic {

    private Logger logger = LoggerFactory.getLogger(LogReceiveTopic.class);

    //正式开发ConnectionFactory和Connection应该设置为单例
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Channel channel;

    /**
     * 在构造函数中获取连接
     */
    public LogReceiveTopic(){
        super();//Objece为其父类...
        try {
            connectionFactory = new ConnectionFactory();
            connection  = connectionFactory.newConnection();
            channel = connection.createChannel();
            //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明)
            channel.exchangeDeclare("logs", "topic");
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    /**
     * 关闭连接的方法
     */
    public boolean closeAll(){
        try {
            this.channel.close();
            this.connection.close();
        } catch (Exception e) {
            logger.error("关闭连接异常:"+e);
            return false;
        }
        return true;
    }

    /**
     * 消费消息
     */
    public void messageReceive(String routingKey){
        try {
            //获取临时列队:自己声明队列是比较麻烦的,
            //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
            String queueName = channel.queueDeclare().getQueue();
            //把获取的临时列队绑定到logs这个exchange交换中心,根据通配符绑定:#匹配n个字,*匹配多个字
            /**
             * 1.列队名称
             * 2.交换中心的名称
             * 3.routingKey和生产者发布消息的时候指定的一样
             */
            channel.queueBind(queueName, "logs", routingKey);
            //解决匿名内部类不能使用非final类型
            final String tempRoutingKey = routingKey;
            //定义一个Consumer消费logs的消息
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    // TODO Auto-generated method stub
                    String message = new String(body,"UTF-8");
                    logger.debug("我是写硬盘的消费者:"+message+",routingKey为:"+tempRoutingKey);
                }
            };
            //自动确认为true,接收到消息后该消息就销毁了
            channel.basicConsume(queueName, true, consumer);
        } catch (Exception e) {
            logger.error("消费消息时异常:"+e);
        }
    }
}

测试启动的main方法:ExchangeTopicMain.java



package com.rabbit.main;

import com.rabbit.exchange.LogReceiveTopic;

public class ExchangeTopicMain {

    public static void main(String[] args) {
        LogReceiveTopic logReceive = new LogReceiveTopic();
        //#匹配n个词词词词词词(core.info,core.error,core.member.info,...)
        logReceive.messageReceive("core.#");
    }

}
1234567891011121314

三.消费者2(使用*单个单词匹配) 
1.接收消息的类:LogReceiveTopic.java



package com.rabbit.exchange;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class LogReceiveTopic {

    private Logger logger = LoggerFactory.getLogger(LogReceiveTopic.class);

    //正式开发ConnectionFactory和Connection应该设置为单例
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Channel channel;

    /**
     * 在构造函数中获取连接
     */
    public LogReceiveTopic(){
        super();//Objece为其父类...
        try {
            connectionFactory = new ConnectionFactory();
            connection  = connectionFactory.newConnection();
            channel = connection.createChannel();
            //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明)
            channel.exchangeDeclare("logs", "topic");
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    /**
     * 关闭连接的方法
     */
    public boolean closeAll(){
        try {
            this.channel.close();
            this.connection.close();
        } catch (Exception e) {
            logger.error("关闭连接异常:"+e);
            return false;
        }
        return true;
    }

    /**
     * 消费消息
     */
    public void messageReceive(String routingKey){
        try {
            //获取临时列队:自己声明队列是比较麻烦的,
            //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
            String queueName = channel.queueDeclare().getQueue();
            //把获取的临时列队绑定到logs这个exchange交换中心,只接受info级别日志
            /**
             * 1.列队名称
             * 2.交换中心的名称
             * 3.routingKey和生产者发布消息的时候指定的一样
             */
            channel.queueBind(queueName, "logs", routingKey);
            //解决匿名内部类不能使用非final变量问题
            final String tempRoutingKey = routingKey;
            //定义一个Consumer消费logs的消息
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    // TODO Auto-generated method stub
                    String message = new String(body,"UTF-8");
                    logger.debug("我是打印日志的消费者:"+message+",routingKey为:"+tempRoutingKey);
                }
            };
            //自动确认为true,接收到消息后该消息就销毁了
            channel.basicConsume(queueName, true, consumer);
        } catch (Exception e) {
            logger.error("消费消息时异常:"+e);
        }
    }
}

2.测试启动的main方法:ExchangeTopicMain.java



package com.rabbit.main;

import com.rabbit.exchange.LogReceiveTopic;

public class ExchangeTopicMain {

    public static void main(String[] args) {
        LogReceiveTopic logReceive = new LogReceiveTopic();
        //*匹配1个词词词词词词
        logReceive.messageReceive("core.*");
    }

}


//匹配结果: 
//core.*:可以       core.#:可以 
//logSender.sendMessage(“hello tiglle:core.info”,”core.info”); 
//core.*:不行       core.#:可以 
logSender.sendMessage(“hello tiglle:core.member.info”,”core.member.info”); 
//注:生产者不能使用通配符,生产者的通配符定义会被当成普通字符文本 
//logSender.sendMessage(“hello tiglle:core.#”,”core.#”);//core.#将会被解析成普通文本    

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标系统运维之Exchange频道!

本文由 @白羽 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程