RabbitMQ:四种ExChange用法
诗诗 2018-03-14 来源 :网络 阅读 976 评论 0

摘要:RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。 ExChange和Queue之前是多对多的关系。 RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。

ExChange和Queue之前是多对多的关系。

RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

 

一、fanout

当向一个fanout发送一个消息时,RoutingKey的设置不起作用。

消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的;

一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息。

 

----------------消息生产者----------------

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

// 声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);

String message = "hello world! ";

 

for(int i=0;i<100;i++)

{

channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());

}

 

System.out.println("Sent msg finish");

 

channel.close();

connection.close();


----------------消息消费者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

//声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);

//声明队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

//绑定路由和队列

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);

 

System.out.println(" Waiting for msg....");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,  AMQP.BasicProperties properties, byte[] body) {

String message = "";

try

{

message = new String(body, "UTF-8");

}

catch (UnsupportedEncodingException e)

{

e.printStackTrace();

}

catch (Throwable ex)

{

ex.printStackTrace();

}

 

System.out.println("Received msg='" + message + "'");

}

};

channel.basicConsume(QUEUE_NAME, true, consumer);

 

二、direct

当向一个direct发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;

一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。

 

----------------消息生产者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

// 声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

String message = "hello world! ";

 

for(int i=0;i<100;i++)

{

channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());

}

 

System.out.println("Sent msg is '" + message + "'");

 

channel.close();

connection.close();


----------------消息消费者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

//声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

//声明队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

//绑定路由和队列

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

 

System.out.println(" Waiting for msg....");

Consumer consumer = new DefaultConsumer(channel)

{

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

byte[] body)

{

String message = "";

try

{

message = new String(body, "UTF-8");

}

catch (UnsupportedEncodingException e)

{

e.printStackTrace();

}

catch (Throwable ex)

{

ex.printStackTrace();

}

 

System.out.println("1 Received msg='" + message + "'");

}

};

 

channel.basicConsume(QUEUE_NAME, true, consumer);

 

三、topic

当向一个topic发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;

一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息。

 

----------------消息生产者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

// 声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);

String message = "hello world! ";

 

// int i=101;

for (int i = 0; i < 100; i++)

{

channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());

}

 

System.out.println("Sent msg is '" + message + "'");

 

channel.close();

connection.close();

 

----------------消息消费者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

// 声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);

//声明队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

//绑定路由和队列// 把队列绑定到路由上并指定headers

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

 

System.out.println("1 Waiting for msg....");

Consumer consumer = new DefaultConsumer(channel)

{

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

{

String message = "";

try

{

message = new String(body, "UTF-8");

}

catch (UnsupportedEncodingException e)

{

e.printStackTrace();

}

catch (Throwable ex)

{

ex.printStackTrace();

}

 

System.out.println("1 Received msg='" + message + "'");

}

};

channel.basicConsume(QUEUE_NAME, true, consumer);

 

四、headers

当向一个headers发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey或者headers的队列,每个队列接收到的消息是一样的;

一个队列内有所有消费者(包含那些并没有相应RoutingKey或headers的消费者),将平分队列接收到的消息。

 

----------------消息生产者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

// 声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);

 

// 设置消息头键值对信息

Map<String, Object> headers = new Hashtable<String, Object>();

headers.put("name", "jack");

headers.put("age", 31);

Builder builder = new Builder();

builder.headers(headers);

 

String message = "hello world! ";

 

// int i=101;

for (int i = 0; i < 100; i++)

{

channel.basicPublish(EXCHANGE_NAME, "routingkey1", builder.build(), (message + i).getBytes());

}

 

System.out.println("Sent msg is '" + message + "'");

 

channel.close();

connection.close();

 

----------------消息消费者----------------

ConnectionFactory factory = new ConnectionFactory();

 

factory.setHost(S_RabbitMQ.QUEUE_IP);

factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

 

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

 

// 声明路由名字和类型

channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);

// 声明队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

 

// 设置消息头键值对信息

Map<String, Object> headers = new Hashtable<String, Object>();

// 这里x-match有两种类型

// all:表示所有的键值对都匹配才能接受到消息

// any:表示只要有键值对匹配就能接受到消息

headers.put("x-match", "all");

headers.put("name", "jack");

headers.put("age", 30);

 

// 把队列绑定到路由上并指定headers

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", headers);

 

System.out.println(" Waiting for msg....");

Consumer consumer = new DefaultConsumer(channel)

{

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException

{

 

System.out.println("Received start --------------");

 

for (Entry<String, Object> entry : properties.getHeaders().entrySet())

{

System.out.println(entry.getKey() + "=" + entry.getValue());

}

 

String message = new String(body, "UTF-8");

 

System.out.println("msg='" + message + "'");

System.out.println("Received end --------------");

}

};

channel.basicConsume(QUEUE_NAME, true, consumer);

 

希望这篇文章可以帮助到你。总之,同学们,你想要的职坐标IT频道都能找到!

本文由 @诗诗 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程