摘要:总结一下几种ExchangeTypes。 即producer不是将消息直接放到队列中,而是先到exchange中,exchange主要用于控制消息到队列的路由,根据具体的exchange type将消息传给需要的队列或者直接废弃。 在这一篇中总结一下那些用到的exchange type。
总结一下几种ExchangeTypes。
即producer不是将消息直接放到队列中,而是先到exchange中,exchange主要用于控制消息到队列的路由,根据具体的exchange type将消息传给需要的队列或者直接废弃。
在这一篇中总结一下那些用到的exchange type。
一.Direct Exchange
direct exchange算是最基本的了。
direct exchange用于将带上routing key的消息传值拥有相同routing key的队列中。
当我们想用一个简单的标识符区别所有传入同一个exchange中的消息时direct exchange就非常合适。
代码如下:
private static String DIRECT_EXCHANGE = "DIRECT_EXCHAGNE";
static class FanoutProducer {
public static void main(String[] args) throws IOException {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();;
String content = "I miss the conversation";
channel.exchangeDeclare(DIRECT_EXCHANGE, ExchangeTypes.DIRECT);
channel.basicPublish(DIRECT_EXCHANGE, "alvez", null, content.getBytes());
}
}
static class FanoutConsumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, DIRECT_EXCHANGE, "alvez");
QueueingConsumer consumer = new QueueingConsumer(channel);
String s = channel.basicConsume(queueName, true, consumer);
System.out.println(s);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("From:" + routingKey + "':'" + message + "'");
}
}
二.Fanout Exchange
fanout和routing key无关,它将消息无差别地(indiscriminately)传送给所有队列。
fanout exchange通常用于发布/订阅模式。
将消息传送给不同的队列,不同的队列对同一种消息采取不同的行为。
比如,现在有一个客户订单消息被三个队列接收,队列1完成该订单,队列2将订单写入日志,队列3将订单发给别的部门什么的。
比如下面的代码,消费者可以获得routing key并输出,但能否获取与routing key无关:
private static String FANOUT_EXCHANGE = "FANOUT_EXCHANGE";
static class DirectProducer {
public static void main(String[] args) throws IOException {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();;
String content = "I miss the conversation";
channel.exchangeDeclare(FANOUT_EXCHANGE, ExchangeTypes.FANOUT);
channel.basicPublish(FANOUT_EXCHANGE, "alvez", null, content.getBytes());
}
}
static class DirectConsumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, FANOUT_EXCHANGE, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
String s = channel.basicConsume(queueName, true, consumer);
System.out.println(s);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("From:" + routingKey + "':'" + message + "'");
}
}
}
希望这篇文章可以帮助到你。总之,同学们,你想要的职坐标IT频道都能找到!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号