Exchange服务器之RabbitMQ Exchange类型详解
白羽 2018-07-25 来源 :网络 阅读 1586 评论 0

摘要:本文将带你了解Exchange服务器之RabbitMQ Exchange类型详解,希望本文对大家学Exchange有所帮助

在上一篇文章中,我们知道了RabbitMQ的消息流程如下:但在具体的使用中,我们还需知道exchange的类型,因为不同的类型对应不同的队列和路由规则。在rabbitmq中,exchange有4个类型:direct,topic,fanout,header。direct exchange此类型的exchange路由规则很简单:exchange在和queue进行binding时会设置routingkeychannel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null); 然后我们在将消息发送到exchange时会设置对应的routingkey:channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body); 在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。具体的流程如下:通过代码可以会理解好一点:var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Direct类型的exchange, 名称 pdf_events
    channel.ExchangeDeclare(exchange: "pdf_events",
                            type: ExchangeType.Direct,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 创建create_pdf_queue队列
    channel.QueueDeclare(queue: "create_pdf_queue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 pdf_log_queue队列
    channel.QueueDeclare(queue: "pdf_log_queue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //绑定 pdf_events --> create_pdf_queue 使用routingkey:pdf_create
    channel.QueueBind(queue: "create_pdf_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_create",
                        arguments: null);

    //绑定 pdf_events --> pdf_log_queue 使用routingkey:pdf_log
    channel.QueueBind(queue: "pdf_log_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_log",
                        arguments: null);


    var message = "Demo some pdf creating...";
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发送消息到exchange :pdf_events ,使用routingkey: pdf_create
    //通过binding routinekey的比较,次消息会路由到队列 create_pdf_queue
    channel.BasicPublish(exchange: "pdf_events",
                routingKey: "pdf_create",
                basicProperties: properties,
                body: body);

    message = "pdf loging ...";
    body = Encoding.UTF8.GetBytes(message);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发送消息到exchange :pdf_events ,使用routingkey: pdf_log
    //通过binding routinekey的比较,次消息会路由到队列 pdf_log_queue
    channel.BasicPublish(exchange: "pdf_events",
            routingKey: "pdf_log",
            basicProperties: properties,
            body: body);

    
} topic exchange此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'.其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词如上图第一个binding:exchange: agreementsqueue A:  berlin_agreementsbinding routingkey: agreements.eu.berlin.#第二个binding: exchange: agreementsqueue B: all_agreementsbinding routingkey: agreements.#第三个binding:exchange: agreementsqueue c: headstore_agreementsbinding routingkey: agreements.eu.*.headstore所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding,但最后一个不符合,具体的代码如下:var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Topic类型的exchange, 名称 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Topic,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 创建berlin_agreements队列
    channel.QueueDeclare(queue: "berlin_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 all_agreements 队列
    channel.QueueDeclare(queue: "all_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 headstore_agreements 队列
    channel.QueueDeclare(queue: "headstore_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
    channel.QueueBind(queue: "berlin_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.berlin.#",
                        arguments: null);

    //绑定 agreements --> all_agreements 使用routingkey:agreements.#
    channel.QueueBind(queue: "all_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.#",
                        arguments: null);

    //绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
    channel.QueueBind(queue: "headstore_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.*.headstore",
                        arguments: null);


    var message = "hello world";
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin
    //agreements.eu.berlin 匹配  agreements.eu.berlin.# 和agreements.#
    //agreements.eu.berlin 不匹配  agreements.eu.*.headstore
    //最终次消息会路由到队里:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)
    channel.BasicPublish(exchange: "agreements",
                            routingKey: "agreements.eu.berlin",
                            basicProperties: properties,
                            body: body);

               
}    

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标系统运维之Exchange频道!

本文由 @白羽 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程