0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

RabbitMQ通信模型中的work模型

科技綠洲 ? 來源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-09-25 14:34 ? 次閱讀

上一篇文章中,簡單的介紹了一下RabbitMQ,以及安裝和hello world。

有的小伙伴留言說看不懂其中的方法參數(shù),這里先解釋一下幾個基本的方法參數(shù)。

// 聲明隊列方法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * param1:queue 隊列的名字
 * param2:durable 是否持久化;比如現(xiàn)在發(fā)送到隊列里面的消息,如果沒有持久化,重啟這個隊列后數(shù) 據(jù)會丟失(false) true:重啟之后數(shù)據(jù)依然在
 * param3:exclusive 是否排外(是否是當前連接的專屬隊列),排外的意思是:
 *            1:連接關(guān)閉之后 這個隊列是否自動刪除(false:不自動刪除)
 *            2:是否允許其他通道來進行訪問這個數(shù)據(jù)(false:不允許) 
 * param4:autoDelete 是否自動刪除
 *            就是當最后一個連接斷開的時候,是否自動刪除這個隊列(false:不刪除)
 * param5:arguments(map) 聲明隊列的時候,附帶的一些參數(shù)
 */
// 發(fā)送數(shù)據(jù)到隊列
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一個隊列消息...".getBytes());
/**
 * param1:exchange  交換機  沒有就設(shè)置為 "" 值就可以了
 * param2:routingKey 路由的key  現(xiàn)在沒有設(shè)置key,直接使用隊列的名字
 * param3:BasicProperties 發(fā)送數(shù)據(jù)到隊列的時候,是否要帶一些參數(shù)。
 *      MessageProperties.PERSISTENT_TEXT_PLAIN表示沒有帶任何參數(shù)
 * param4:body 向隊列中發(fā)送的消息數(shù)據(jù)
 */

Work模型

work模型稱為工作隊列或者競爭消費者模式,多個消費者消費的數(shù)據(jù)之和才是原來隊列中的所有數(shù)據(jù),適用于流量的削峰。

圖片
img

演示

寫個簡單的測試:

  1. 生產(chǎn)者

    public class Producer {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes());
            }
            channel.close();
            connection.close();
        }
    
    }
    
  2. 消費者

    // 消費者1
    public class Consumer {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.basicQos(0, 1, false);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(System.currentTimeMillis() + "消費者1接收到信息:" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    
    }
    
    // 消費者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.basicQos(0, 1, false);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(System.currentTimeMillis() + "消費者2接收到信息:" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    // 這里加了個延遲,表示處理業(yè)務時間
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    }
    
  3. 結(jié)果
    圖片
    image-20221229210012145

圖片
image-20221229210046184

可以看出來:100條消息,消費者之間是平分的,消費者1 幾乎是瞬間完成,消費者2 則是慢慢吞吞的運行完畢,消費者1大量時間處于空閑狀態(tài),消費者2則一直忙碌。這顯然是不適用于實際開發(fā)中。

我們需要遵從一個原則,就是 能者多勞 ,消費越快的人,消費的越多;

現(xiàn)在我們把消費者1和2的代碼中 // channel.basicQos(0, 1, false); 這行代碼取消注釋,再次運行;

圖片
image-20221229211317632

圖片
image-20221229211335782

現(xiàn)在的結(jié)果就比較符合能者多勞,雖然你干的多,但是工資是一樣的呀~

work模型的一個主要的方法是basicQos();這里也解釋一下其參數(shù):

// 設(shè)置限流機制
channel.basicQos(0, 1, false);
/**  
 *  param1: prefetchSize,消息本身的大小 如果設(shè)置為0  那么表示對消息本身的大小不限制
 *  param2: prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息
 *  param3:global,是否將上面的設(shè)置應用于整個通道,false表示只應用于當前消費者
 */

小結(jié)

本文到這里就結(jié)束了,主要介紹了RabbitMQ通信模型中的work模型,適用于限流、削峰等應用場景。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    7232

    瀏覽量

    90714
  • 通信
    +關(guān)注

    關(guān)注

    18

    文章

    6141

    瀏覽量

    137138
  • 模型
    +關(guān)注

    關(guān)注

    1

    文章

    3462

    瀏覽量

    49789
  • Work
    +關(guān)注

    關(guān)注

    0

    文章

    9

    瀏覽量

    9130
  • rabbitmq
    +關(guān)注

    關(guān)注

    0

    文章

    19

    瀏覽量

    1104
收藏 0人收藏

    評論

    相關(guān)推薦

    RabbitMQ的發(fā)布訂閱模型

    上一篇文章,簡單的介紹了一下RabbitMQwork模型。這篇文章來學習一下RabbitMQ
    的頭像 發(fā)表于 09-25 14:30 ?683次閱讀
    <b class='flag-5'>RabbitMQ</b><b class='flag-5'>中</b>的發(fā)布訂閱<b class='flag-5'>模型</b>

    CAN總線通信協(xié)議模型概述 CAN總線通信模型作用

    參照 ISO/OSI 標準模型,CAN 總線的通信參考模型如圖 9-1 所示。這 4 層結(jié)構(gòu)的功能如下:? 物理層規(guī)定了節(jié)點的全部電氣特性,在一個網(wǎng)絡里,要實現(xiàn)不同節(jié)點間的數(shù)據(jù)傳輸,所有節(jié)點的物理層
    發(fā)表于 12-14 14:17

    MQTT的通信模型及消息

     MQTT通信模型    MQTT協(xié)議是基于客戶端-服務器模型,在協(xié)議主要有三種身份:發(fā)布者(Publisher)、服務器(Broker) 以及訂閱者(Subscriber)。 并且消息發(fā)布者可以
    發(fā)表于 01-19 15:57

    基于VxWorks實時操作系統(tǒng)的通信模型該怎樣去設(shè)計?

    多任務實時操作系統(tǒng)VxWorks是什么?與傳統(tǒng)通信機制相比,模塊間通信模型有什么優(yōu)勢?基于VxWorks實時操作系統(tǒng)的通信模型該怎樣去設(shè)計?
    發(fā)表于 04-26 06:25

    移動Agent位置透明通信模型的設(shè)計

    提出一種高效可靠的移動Agent通信模型――D-C通信模型,結(jié)合域名字解析器和移動Agent系統(tǒng)的Communicator實現(xiàn)移動Agent之間的通信。通過引入一種基于全局的、與位置
    發(fā)表于 04-16 08:53 ?26次下載

    過程控制工業(yè)以太網(wǎng)通信模型探討

    提出了建立在交換式以太網(wǎng)和IEEE 802.1Q/P 技術(shù)基礎(chǔ)上用于過程控制的以太網(wǎng)通信模型REPC,并進行了分析。關(guān)鍵詞:通信模型工業(yè)以太網(wǎng) 過程控制Abstract: REPC, a communication model of in
    發(fā)表于 06-19 08:34 ?27次下載

    數(shù)據(jù)網(wǎng)格基于優(yōu)化機制的通信模型

    針對基于多計算機機群構(gòu)成的網(wǎng)格的大規(guī)模并行計算的需要,對多級分組通信模型的單一機群分組通信進行了研究。探討了在單一機群內(nèi)的主動節(jié)點、被動節(jié)點個數(shù)和各個計算節(jié)點
    發(fā)表于 06-25 13:52 ?12次下載

    基于VxWorks的通信模型設(shè)計

    本文提出了一種任務間的通信模型,將用于網(wǎng)絡通信的UDP方式引進到任務間的通信中,使通信更加靈活和便于管理,改善了整個系統(tǒng)的性能。
    發(fā)表于 06-01 10:07 ?1111次閱讀
    基于VxWorks的<b class='flag-5'>通信模型</b>設(shè)計

    企業(yè)資產(chǎn)管理系統(tǒng)通信模型的研究與實現(xiàn)

    為了改善企業(yè)資產(chǎn)管理(EAM)系統(tǒng)在用戶體驗、模塊間數(shù)據(jù)傳輸效率及耦合度等方面的不足,構(gòu)建了基于Silverlight與WCF技術(shù)研究與實現(xiàn)EAM系統(tǒng)通信模型。利用Silverlight構(gòu)建客戶端提升
    發(fā)表于 07-06 16:57 ?34次下載
    企業(yè)資產(chǎn)管理系統(tǒng)<b class='flag-5'>中</b><b class='flag-5'>通信模型</b>的研究與實現(xiàn)

    網(wǎng)絡通信模型

    網(wǎng)絡通信模型,在基礎(chǔ)講解的前提下,建立數(shù)學模型來分析。
    發(fā)表于 03-15 13:56 ?9次下載

    一種基于Kademlia的P2P語音通信模型

    一種基于Kademlia的P2P語音通信模型_陳立全
    發(fā)表于 01-07 16:52 ?3次下載

    基于Zigbee的無線智能輸液通信模型設(shè)計楊艷

    基于Zigbee的無線智能輸液通信模型設(shè)計_楊艷
    發(fā)表于 03-16 08:00 ?3次下載

    Topic 模型的使用

    RabbitMQ 是一個流行的開源消息隊列軟件,它提供了多種通信模型,例如發(fā)布/訂閱模型、路由模型、work
    的頭像 發(fā)表于 09-25 11:30 ?828次閱讀

    RabbitMQ的路由模型(direct)

    路由模型 RabbitMQ 提供了五種不同的通信模型,上一篇文章,簡單的介紹了一下RabbitMQ的發(fā)布訂閱
    的頭像 發(fā)表于 09-25 11:32 ?641次閱讀

    什么是通信模型DDS

    完成的,它相當于是ROS機器人系統(tǒng)的神經(jīng)網(wǎng)絡。 通信模型 DDS的核心是通信,能夠?qū)崿F(xiàn)通信模型和軟件框架非常多,這里我們列出常用的四種
    的頭像 發(fā)表于 11-24 17:50 ?1868次閱讀

    電子發(fā)燒友

    中國電子工程師最喜歡的網(wǎng)站

    • 2931785位工程師會員交流學習
    • 獲取您個性化的科技前沿技術(shù)信息
    • 參加活動獲取豐厚的禮品