自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RabbitMQ工作模式-Routing路由模式

開發(fā) 架構(gòu)
Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定Routing key,消息會轉(zhuǎn)發(fā)到符合Routing key的隊(duì)列。

Routing路由模式

1、模式說明

路由模式特點(diǎn):

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)。
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會接收到消息。

圖解:

  • P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會指定一個(gè)routing key。
  • X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
  • C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
  • C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息

2。案例

在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key。

在寫案例之前,我們首先定義一下需求:

  • 生產(chǎn)者:發(fā)送兩條消息,一條消息的用于插入數(shù)據(jù),另一條消息用于更新數(shù)據(jù)。
  • 消費(fèi)者1:接收插入數(shù)據(jù)的消息,進(jìn)行數(shù)據(jù)插入。
  • 消費(fèi)者2:接收更新數(shù)據(jù)的消息,進(jìn)行數(shù)據(jù)更新。

(1)生產(chǎn)者

package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/3 8:16
 */
public class Producer_Routing {
    //交換機(jī)名稱
    static final String DIRECT_EXCHAGE = "direct_exchange";
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設(shè)置參數(shù)
        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost
        factory.setPort(5672); //端口  默認(rèn)值 5672
        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
        factory.setUsername("libai"); // 用戶名 默認(rèn) guest
        factory.setPassword("libai"); //密碼 默認(rèn)值 guest
        //3. 創(chuàng)建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //5. 創(chuàng)建交換機(jī)
        /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           參數(shù):
            1. exchange:交換機(jī)名稱
            2. type:交換機(jī)類型
                DIRECT("direct"):定向
                FANOUT("fanout"):扇形(廣播),發(fā)送消息到每一個(gè)與之綁定隊(duì)列。
                TOPIC("topic") 通配符的方式
                HEADERS("headers") 參數(shù)匹配
            3. durable:是否持久化
            4. autoDelete:自動(dòng)刪除
            5. internal:內(nèi)部使用。 一般false
            6. arguments:參數(shù)
        */
        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 6.聲明(創(chuàng)建)隊(duì)列
        /**
         * 參數(shù)1:隊(duì)列名稱
         * 參數(shù)2:是否定義持久化隊(duì)列
         * 參數(shù)3:是否獨(dú)占本次連接
         * 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
         * 參數(shù)5:隊(duì)列其它參數(shù)
         */
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
        // 7. 綁定隊(duì)列和交換機(jī)
        /*
            queueBind(String queue, String exchange, String routingKey)
            參數(shù):
                1. queue:隊(duì)列名稱
                2. exchange:交換機(jī)名稱
                3. routingKey:路由鍵,綁定規(guī)則
                    如果交換機(jī)的類型為fanout ,routingKey設(shè)置為""
         */
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
        //8. 發(fā)送消息至交換機(jī),由交換機(jī)分發(fā)消息
        // 發(fā)送信息
        String message = "新增了商品。路由模式;routing key 為 insert " ;
        /**
         * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage
         * 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱
         * 參數(shù)3:消息其它屬性
         * 參數(shù)4:消息內(nèi)容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);
        // 發(fā)送信息
        message = "修改了商品。路由模式;routing key 為 update" ;
        /**
         * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage
         * 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱
         * 參數(shù)3:消息其它屬性
         * 參數(shù)4:消息內(nèi)容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
        System.out.println("已發(fā)送消息:" + message);
        //9. 釋放資源
        channel.close();
        connection.close();
    }
}

執(zhí)行發(fā)送消息:

發(fā)送消息之后,我們來看看聲明好的交換機(jī):

(2)消費(fèi)者1:專門接收 insert 的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_Routing1 {

    //隊(duì)列名稱
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設(shè)置參數(shù)
        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost
        factory.setPort(5672); //端口  默認(rèn)值 5672
        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
        factory.setUsername("libai"); // 用戶名 默認(rèn) guest
        factory.setPassword("libai"); //密碼 默認(rèn)值 guest
        //3. 創(chuàng)建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //5. 創(chuàng)建隊(duì)列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. durable:是否持久化,當(dāng)mq重啟之后,還在
            3. exclusive:
                * 是否獨(dú)占。只能有一個(gè)消費(fèi)者監(jiān)聽這隊(duì)列
                * 當(dāng)Connection關(guān)閉時(shí),是否刪除隊(duì)列
            4. autoDelete:是否自動(dòng)刪除。當(dāng)沒有Consumer時(shí),自動(dòng)刪除掉
            5. arguments:參數(shù)。

         */
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. autoAck:是否自動(dòng)確認(rèn)
            3. callback:回調(diào)對象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調(diào)方法,當(dāng)收到消息后,會自動(dòng)執(zhí)行該方法
                1. consumerTag:標(biāo)識
                2. envelope:獲取一些信息,交換機(jī),路由key...
                3. properties:配置信息
                4. body:數(shù)據(jù)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊(duì)列的數(shù)據(jù) body: " + new String(body));
            }
        };
        channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);

        //不需要關(guān)閉資源,因?yàn)橄M(fèi)者需要持續(xù)監(jiān)聽隊(duì)列信息
    }
}

(3)消費(fèi)者2:專門接收 update 的消息

package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */
public class Consumer_Routing2 {
    //隊(duì)列名稱
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2. 設(shè)置參數(shù)
        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost
        factory.setPort(5672); //端口  默認(rèn)值 5672
        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
        factory.setUsername("libai"); // 用戶名 默認(rèn) guest
        factory.setPassword("libai"); //密碼 默認(rèn)值 guest
        //3. 創(chuàng)建連接 Connection
        Connection connection = factory.newConnection();
        //4. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //5. 創(chuàng)建隊(duì)列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. durable:是否持久化,當(dāng)mq重啟之后,還在
            3. exclusive:
                * 是否獨(dú)占。只能有一個(gè)消費(fèi)者監(jiān)聽這隊(duì)列
                * 當(dāng)Connection關(guān)閉時(shí),是否刪除隊(duì)列
            4. autoDelete:是否自動(dòng)刪除。當(dāng)沒有Consumer時(shí),自動(dòng)刪除掉
            5. arguments:參數(shù)。
         */
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        參數(shù):
            1. queue:隊(duì)列名稱
            2. autoAck:是否自動(dòng)確認(rèn)
            3. callback:回調(diào)對象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回調(diào)方法,當(dāng)收到消息后,會自動(dòng)執(zhí)行該方法
                1. consumerTag:標(biāo)識
                2. envelope:獲取一些信息,交換機(jī),路由key...
                3. properties:配置信息
                4. body:數(shù)據(jù)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收隊(duì)列的數(shù)據(jù) body: " + new String(body));
            }
        };
        channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);
        //不需要關(guān)閉資源,因?yàn)橄M(fèi)者需要持續(xù)監(jiān)聽隊(duì)列信息
    }
}

3、測試

啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對應(yīng)的控制臺可以查看到生產(chǎn)者發(fā)送對應(yīng)routing key對應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果。

  • 消費(fèi)者1 收到了 insert 的消息

  • 消費(fèi)者2 收到了 update 的消息

4、小結(jié)

Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定routing key,消息會轉(zhuǎn)發(fā)到符合routing key的隊(duì)列。

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2023-11-10 09:22:06

2021-04-18 21:07:32

門面模式設(shè)計(jì)

2021-04-14 09:02:22

模式 設(shè)計(jì)建造者

2023-05-17 08:16:04

RabbitMQ消息傳遞

2010-08-06 09:17:37

RIP路由協(xié)議

2012-10-08 11:18:38

企業(yè)應(yīng)用架構(gòu)工作單元模式

2021-08-11 17:22:11

設(shè)計(jì)模式單例

2023-09-26 01:21:34

2022-08-15 11:21:48

戴爾

2009-12-14 17:49:44

路由選擇協(xié)議

2025-04-21 04:00:00

2021-06-03 09:18:25

裝飾器模式包裝

2021-07-05 12:33:31

混合工作ITPagerDuty

2010-08-05 13:04:05

路由器

2024-01-01 08:19:32

模式History前端

2021-04-19 21:25:48

設(shè)計(jì)模式到元

2023-06-05 08:14:17

RabbitMQ兔子MQ開源

2020-12-07 11:23:22

云計(jì)算混合云

2020-12-03 10:51:45

云計(jì)算混合云IT

2025-01-26 15:13:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號