博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ入门(4)——路由(Routing)
阅读量:4963 次
发布时间:2019-06-12

本文共 5532 字,大约阅读时间需要 18 分钟。

这一篇我们将介绍如何订阅消息的一个子集。例如,我们只需要将日志中的error消息存储到日志文件中而将所有日志消息都在控制台打印出来。

绑定(Bindings)

在前面的例子中,我们创建了交换机和队列的绑定关系:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定交换机和队列可以理解为队列对这个交换机上的消息感兴趣。

绑定可以添加额外的参数routingKey,称之为绑定键(binding key)。下面是我们如何使用键创建绑定的方法:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的含义取决于交换类型。对于fanout类型而言,忽略这个参数。

直接交换(Direct exchange)

在前面的例子中,广播所有的消息到所有的消费者。这里,我们希望将它扩展为基于其严重性的过滤消息。例如,我们希望将重大错误信息写入磁盘,而不要浪费磁盘存储警告或消息类型的日志。

之前使用的fanout交换并没有给我们带来灵活性,它只能进行盲目地转发消息。

这边我们将使用直接交换。直接交换背后的算法很简单:消息将发送到绑定键与消息的路由键完全匹配的队列。

参考下面的图解:

525435-20170813223800945-1698142434.png

我们可以看到,直接交换机x上绑定了两个队列。Q1队列与绑定键orange绑定,Q2队列与两个绑定键绑定,绑定键balck和绑定键green。

这样,附带orange绑定键的消息将会发送到Q1队列,附带black或green绑定键的消息会发送Q2队列,其他消息会丢弃。

多重绑定(Multiple bindings)

525435-20170813223811226-1958003537.png

使用相同的绑定键绑定多个队列是完全合法的。如上图,一个附带了绑定键black的消息将会发送到Q1和Q2。这种情况下就成了特殊的fanout交换了。

发出日志(Emitting logs)

这里我们将使用direct交换,根据日志的严重性决定路由键。接收消息的程序根据消息的严重性接收。

首先,创建一个交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

发送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了说明问题,假定严重性为:info,warning,error

订阅(Subscribing)

接收消息的方式基本和前面介绍过的一致,不同点在于我们将为每一个感兴趣的严重性创建绑定:

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){  channel.queueBind(queueName, EXCHANGE_NAME, severity);}

代码清单

发送端:

package com.xxyh.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeoutException;public class EmitLogDirect {    private static final String EXCHANGE_NAME = "direct_logs";    private static final String[] SEVERITIES = {"info", "warning", "error"};    public static void main(String[] args) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 发送10条消息        for (int i = 0; i < 10; i++) {            String severity = getSeverity();            String message = severity + "... log ..." + UUID.randomUUID().toString();            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("utf-8"));            System.out.println(Thread.currentThread().getName()+" 发送消息:" + message);        }        channel.close();        connection.close();    }    // 随机产生一种消息类型    private static String getSeverity() {        Random random = new Random();        int i = random.nextInt(3);        return SEVERITIES[i];    }}

接收端:

package com.xxyh.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Random;import java.util.concurrent.TimeoutException;public class ReceiveLogsDirect {    private static final String EXCHANGE_NAME = "direct_logs";    private static final String[] SEVERITIES = {"info", "warning", "error"};    public static void main(String[] args) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        String queue = channel.queueDeclare().getQueue();        String severity = getSeverity();        channel.queueBind(queue, EXCHANGE_NAME, severity);        final Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "utf-8");                System.out.println(Thread.currentThread().getName()+" 接收消息: " + message);            }        };        channel.basicConsume(queue, true, consumer);    }    // 随机产生一种消息类型    private static String getSeverity() {        Random random = new Random();        int i = random.nextInt(3);        return SEVERITIES[i];    }}

多开启几个接收端,运气好的话,三个接收端分别收到三种类型的日志:

Recv1:

pool-1-thread-4 接收消息: error... log ...3f8faf78-12ca-45b6-91e3-7f6ac77912b5pool-1-thread-4 接收消息: error... log ...d76d703a-af5a-412e-a534-b567ed94c743pool-1-thread-5 接收消息: error... log ...8ac89111-5c6a-4c0f-a85d-31d1c66dcafdpool-1-thread-6 接收消息: error... log ...c0054e75-5186-436b-8f19-033ccc725fcdpool-1-thread-7 接收消息: error... log ...c2dc8196-6a11-4ea8-b02b-98eff0ff64a1

Recv2:

pool-1-thread-4 接收消息: warning... log ...8cc696ce-0b07-4223-bfc6-3be376fa3d7cpool-1-thread-5 接收消息: warning... log ...579185f7-e09f-4bcf-81eb-85e35ebc9a5apool-1-thread-6 接收消息: warning... log ...38edb505-a944-4f64-8843-766dfca6a6ae

Recv3:

pool-1-thread-4 接收消息: info... log ...3e7ad3d4-dfd7-4f06-b4d7-bbe50ed0c9e6pool-1-thread-5 接收消息: info... log ...5b9b3035-6b12-4c60-9907-ec8d8c7d21ba

Send:

main 发送消息:error... log ...3f8faf78-12ca-45b6-91e3-7f6ac77912b5main 发送消息:error... log ...d76d703a-af5a-412e-a534-b567ed94c743main 发送消息:warning... log ...8cc696ce-0b07-4223-bfc6-3be376fa3d7cmain 发送消息:info... log ...3e7ad3d4-dfd7-4f06-b4d7-bbe50ed0c9e6main 发送消息:warning... log ...579185f7-e09f-4bcf-81eb-85e35ebc9a5amain 发送消息:info... log ...5b9b3035-6b12-4c60-9907-ec8d8c7d21bamain 发送消息:error... log ...8ac89111-5c6a-4c0f-a85d-31d1c66dcafdmain 发送消息:error... log ...c0054e75-5186-436b-8f19-033ccc725fcdmain 发送消息:error... log ...c2dc8196-6a11-4ea8-b02b-98eff0ff64a1main 发送消息:warning... log ...38edb505-a944-4f64-8843-766dfca6a6ae

转载于:https://www.cnblogs.com/xiaoxiaoyihan/p/7355313.html

你可能感兴趣的文章
Windows向Linux上传文件夹
查看>>
20180104-高级特性-Slice
查看>>
6个SQL Server 2005性能优化工具介绍
查看>>
nginx启动、关闭命令、重启nginx报错open() "/var/run/nginx/nginx.pid" failed
查看>>
BZOJ 3097 Hash Killer I
查看>>
UINavigationController的视图层理关系
查看>>
html阴影效果怎么做,css 内阴影怎么做
查看>>
宏观经济
查看>>
综合练习:词频统计
查看>>
BZOJ1026: [SCOI2009]windy数
查看>>
样板操作数
查看>>
64位UBUNTU下安装adobe reader后无法启动
查看>>
iTextSharp带中文转换出来的PDF文档显示乱码
查看>>
组件:slot插槽
查看>>
走进C++程序世界------异常处理
查看>>
Nginx配置文件nginx.conf中文详解(转)
查看>>
POJ 1308 Is It A Tree?(并查集)
查看>>
N进制到M进制的转换问题
查看>>
利用sed把一行的文本文件改成每句一行
查看>>
Android应用开发:核心技术解析与最佳实践pdf
查看>>