0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

關(guān)于PriorityBlockingQueue中隊(duì)列操作

openEuler ? 來(lái)源:openEuler ? 作者:openEuler ? 2022-05-07 16:43 ? 次閱讀

編者按:筆者在使用PriorityBlockingQueue實(shí)現(xiàn)按照優(yōu)先級(jí)處理任務(wù)時(shí)遇到一類(lèi)NPE問(wèn)題,經(jīng)過(guò)分析發(fā)現(xiàn)根本原因是在任務(wù)出隊(duì)列時(shí)調(diào)用比較器異常,進(jìn)而導(dǎo)致后續(xù)任務(wù)出隊(duì)列拋出NullPointerException。本文通過(guò)完整的案例復(fù)現(xiàn)來(lái)演示在什么情況會(huì)觸發(fā)該問(wèn)題,同時(shí)給出了處理建議。希望讀者在編程時(shí)加以借鑒,避免再次遇到此類(lèi)問(wèn)題。

背景知識(shí)

PriorityBlockingQueue是一個(gè)無(wú)界的基于數(shù)組的優(yōu)先級(jí)阻塞隊(duì)列,使用一個(gè)全局ReentrantLock來(lái)控制某一時(shí)刻只有一個(gè)線程可以進(jìn)行元素出隊(duì)和入隊(duì)操作,并且每次出隊(duì)都返回優(yōu)先級(jí)別最高的或者最低的元素。PriorityBlockingQueue通過(guò)以下兩種方式實(shí)現(xiàn)元素優(yōu)先級(jí)排序:

  1. 入隊(duì)元素實(shí)現(xiàn)Comparable接口來(lái)比較元素優(yōu)先級(jí);
  2. PriorityBlockingQueue構(gòu)造函數(shù)指定Comparator來(lái)比較元素優(yōu)先級(jí);

關(guān)于PriorityBlockingQueue中隊(duì)列操作的部分,基本和PriorityQueue邏輯一致,只不過(guò)在操作時(shí)加鎖了。在本文中我們主要關(guān)注PriorityBlockingQueue出隊(duì)的take方法,該方法通過(guò)調(diào)用dequeue方法將元素出隊(duì)列。當(dāng)沒(méi)有元素可以出隊(duì)的時(shí)候,線程就會(huì)阻塞等待。

publicEtake()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
Eresult;
try{
//嘗試獲取最小元素,即小頂堆第一個(gè)元素,然后重新排序,如果不存在表示隊(duì)列暫無(wú)元素,進(jìn)行阻塞等待。
while((result=dequeue())==null)
notEmpty.await();
}finally{
lock.unlock();
}
returnresult;
}

現(xiàn)象

在某個(gè)業(yè)務(wù)服務(wù)中使用PriorityBlockingQueue實(shí)現(xiàn)按照優(yōu)先級(jí)處理任務(wù),某一天環(huán)境中的服務(wù)突然間不處理任務(wù)了,查看后臺(tái)日志,發(fā)現(xiàn)一直拋出NullPointerException。將進(jìn)程堆dump出來(lái),使用MAT發(fā)現(xiàn)某個(gè)PriorityBlockingQueue中的size值比實(shí)際元素個(gè)數(shù)多1個(gè)(入隊(duì)時(shí)已經(jīng)對(duì)任務(wù)進(jìn)行非空校驗(yàn))。

異常堆棧如下:

java.lang.NullPointerException
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
...

MAT結(jié)果:

4f7ab378-cd26-11ec-bce3-dac502259ad0.png

原因分析

在此我們分析下PriorityBlockingQueue是如何出隊(duì)列的,PriorityBlockingQueue最終通過(guò)調(diào)用dequeue方法出隊(duì)列,dequeue方法處理邏輯如下:

  1. 將根節(jié)點(diǎn)(array[0])賦值給result;
  2. array[n] 賦值給 arrary[0];
  3. 將 array[n] 設(shè)置為 null;
  4. 調(diào)用siftDownComparable或siftDownUsingComparator對(duì)隊(duì)列元素重新排序;
  5. size大小減1;
  6. 返回result;

如果在第4步中出現(xiàn)異常,就會(huì)出現(xiàn)隊(duì)列中的元素個(gè)數(shù)比實(shí)際的元素個(gè)數(shù)多1個(gè)的現(xiàn)象。此時(shí)size未發(fā)生改變,arry[n]已經(jīng)被置為null,再進(jìn)行siftDown操作時(shí)就會(huì)拋出NullPointerException。繼續(xù)分析第4步中在什么情況下會(huì)出現(xiàn)異常,通過(guò)代碼走讀我們可以發(fā)現(xiàn)只有在調(diào)用Comparable#compareTo或者Comparator#compare方法進(jìn)行元素比較的時(shí)候才可能出現(xiàn)異常。這塊代碼的處理邏輯和業(yè)務(wù)相關(guān),如果業(yè)務(wù)代碼處理不當(dāng)拋出異常,就會(huì)導(dǎo)致上述現(xiàn)象。

/**
*Mechanicsforpoll().Callonlywhileholdinglock.
*/
privateEdequeue(){
intn=size-1;
if(n0)
returnnull;
else{
Object[]array=queue;
Eresult=(E)array[0];//step1
Ex=(E)array[n];//step2
array[n]=null;//step3
ComparatorsuperE>cmp=comparator;
if(cmp==null)//step4 如果指定了comparator,就按照指定的comparator來(lái)比較。否則就按照默認(rèn)的
siftDownComparable(0,x,array,n);
else
siftDownUsingComparator(0,x,array,n,cmp);
size=n;//step5
returnresult;//step6
}
}

privatestaticvoidsiftDownComparable(intk,Tx,Object[]array,intn){
if(n>0){
ComparablesuperT>key=(ComparablesuperT>)x;
inthalf=n>>>1;
while(kintchild=(k<1)+1;
Objectc=array[child];
intright=child+1;
if(rightsuperT>)c).compareTo((T)array[right])>0)
c=array[child=right];
if(key.compareTo((T)c)<=?0)
break;
array[k]=c;
k=child;
}
array[k]=key;
}
}
privatestaticvoidsiftDownUsingComparator(intk,Tx,Object[]array,intn,
ComparatorsuperT>cmp){
if(n>0){
inthalf=n>>>1;
while(kintchild=(k<1)+1;
Objectc=array[child];
intright=child+1;
if(right0)
c=array[child=right];
if(cmp.compare(x,(T)c)<=?0)
break;
array[k]=c;
k=child;
}
array[k]=x;
}
}

復(fù)現(xiàn)代碼

importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.PriorityBlockingQueue;

publicclassPriorityBlockingQueueTest{
staticclassEntityimplementsComparable<Entity>{
privateintid;
privateStringname;
privatebooleanflag;

publicvoidsetFlag(booleanflag){
this.flag=flag;
}

publicEntity(intid,Stringname){
this.id=id;
this.name=name;
}

@Override
publicintcompareTo(Entityentity){
if(flag){
thrownewRuntimeException("TestException");
}
if(entity==null||this.id>entity.id){
return1;
}
returnthis.id==entity.id?0:-1;
}
}

publicstaticvoidmain(String[]args){
intnum=5;
PriorityBlockingQueuepriorityBlockingQueue=newPriorityBlockingQueue<>();
Listentities=newArrayList<>();
for(inti=0;inewEntity(i,"entity"+i);
entities.add(entity);
priorityBlockingQueue.offer(entity);
}

entities.get(num-1).setFlag(true);
intsize=entities.size();
for(inti=0;itry{
priorityBlockingQueue.take();
}catch(Exceptione){
e.printStackTrace();
}
}
}

執(zhí)行結(jié)果如下:

java.lang.RuntimeException:TestException
atPriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:31)
atPriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:8)
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
atPriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
java.lang.NullPointerException
atjava.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
atjava.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
atjava.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
atPriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)

規(guī)避方案

可以通過(guò)以下兩種方法規(guī)避:

  • 在take方法出現(xiàn)NPE時(shí),清除隊(duì)列元素,將未處理的元素重新進(jìn)入隊(duì)列;
  • 在 Comparable#compareTo 或 Comparator#compare 方法中做好異常處理,對(duì)異常情況進(jìn)行默認(rèn)操作;

建議使用后者。

案例引申

使用PriorityBlockingQueue作為緩存隊(duì)列來(lái)創(chuàng)建線程池時(shí),使用submit提交任務(wù)會(huì)出現(xiàn) java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to 異常,而使用execute沒(méi)有問(wèn)題。

觀察submit源碼可以發(fā)現(xiàn)在submit內(nèi)部代碼會(huì)將Runable封裝成RunnableFuture對(duì)象,然后調(diào)用execute提交任務(wù)。

publicFuturesubmit(Runnabletask){
if(task==null)thrownewNullPointerException();
RunnableFutureftask=newTaskFor(task,null);
execute(ftask);
returnftask;
}

以Comparable為例,任務(wù)入隊(duì)列時(shí),最終會(huì)調(diào)用siftUpComparable方法。該方法第一步將RunnableFuture強(qiáng)轉(zhuǎn)為Comparable類(lèi)型,而RunnableFuture類(lèi)未實(shí)現(xiàn)Comparable接口,進(jìn)而拋出ClassCastException異常。

publicbooleanoffer(Ee){
if(e==null)
thrownewNullPointerException();
finalReentrantLocklock=this.lock;
lock.lock();
intn,cap;
Object[]array;
while((n=size)>=(cap=(array=queue).length))
tryGrow(array,cap);
try{
ComparatorsuperE>cmp=comparator;
if(cmp==null)
siftUpComparable(n,e,array);
else
siftUpUsingComparator(n,e,array,cmp);
size=n+1;
notEmpty.signal();
}finally{
lock.unlock();
}
returntrue;
}

privatestaticvoidsiftUpComparable(intk,Tx,Object[]array){
ComparablesuperT>key=(ComparablesuperT>)x;
while(k>0){
intparent=(k-1)>>>1;
Objecte=array[parent];
if(key.compareTo((T)e)>=0)
break;
array[k]=e;
k=parent;
}
array[k]=key;
}

這也是常見(jiàn)的比較器調(diào)用異常案例,本文不再贅述,可自行參考其他文章。

總結(jié)

在使用PriorityBlockingQueue時(shí),注意在比較器中做好異常處理,避免出現(xiàn)類(lèi)似問(wèn)題。

審核編輯 :李倩


聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 比較器
    +關(guān)注

    關(guān)注

    14

    文章

    1656

    瀏覽量

    107329
  • 數(shù)組
    +關(guān)注

    關(guān)注

    1

    文章

    417

    瀏覽量

    25988

原文標(biāo)題:畢昇 JDK | PriorityBlockingQueue比較器異常導(dǎo)致的NPE問(wèn)題分析

文章出處:【微信號(hào):openEulercommunity,微信公眾號(hào):openEuler】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    JavaWeb消息隊(duì)列使用指南

    在現(xiàn)代的JavaWeb應(yīng)用中,消息隊(duì)列(Message Queue)是一種常見(jiàn)的技術(shù),用于異步處理任務(wù)、解耦系統(tǒng)組件、提高系統(tǒng)性能和可靠性。 1. 消息隊(duì)列的基本概念 消息隊(duì)列是一種應(yīng)用程序?qū)?yīng)
    的頭像 發(fā)表于 11-25 09:27 ?172次閱讀

    探索字節(jié)隊(duì)列的魔法:多類(lèi)型支持、函數(shù)重載與線程安全

    的數(shù)據(jù)結(jié)構(gòu),它能夠高效地存儲(chǔ)和管理數(shù)據(jù)流。通過(guò)使用字節(jié)隊(duì)列,我們可以靈活地處理不同類(lèi)型的數(shù)據(jù)、確保數(shù)據(jù)的完整性,并在多線程環(huán)境中安全地進(jìn)行操作。本文將深入探討字節(jié)
    的頭像 發(fā)表于 11-15 01:08 ?838次閱讀
    探索字節(jié)<b class='flag-5'>隊(duì)列</b>的魔法:多類(lèi)型支持、函數(shù)重載與線程安全

    為什么同一個(gè)隊(duì)列引用的全局變量,運(yùn)行在兩個(gè)子vi中發(fā)現(xiàn)隊(duì)列數(shù)據(jù)丟失了

    我創(chuàng)建了一個(gè)隊(duì)列,然后將隊(duì)列引用做了個(gè)全局變量,運(yùn)行在兩個(gè)子vi中,一個(gè)是只入隊(duì)列,另一個(gè)是只出隊(duì)列。但我發(fā)現(xiàn),一個(gè)字vi數(shù)據(jù)入隊(duì)列成功,檢
    發(fā)表于 11-14 11:47

    嵌入式環(huán)形隊(duì)列與消息隊(duì)列的實(shí)現(xiàn)原理

    嵌入式環(huán)形隊(duì)列,也稱為環(huán)形緩沖區(qū)或循環(huán)隊(duì)列,是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),用于在固定大小的存儲(chǔ)區(qū)域中高效地存儲(chǔ)和訪問(wèn)數(shù)據(jù)。其主要特點(diǎn)包括固定大小的數(shù)組和兩個(gè)指針(頭指針和尾指針),分別指向隊(duì)列的起始位置和結(jié)束位置。
    的頭像 發(fā)表于 09-02 15:29 ?605次閱讀

    玩轉(zhuǎn)RT-Thread之消息隊(duì)列的應(yīng)用

    在嵌入式系統(tǒng)開(kāi)發(fā)中,實(shí)時(shí)處理串口和ADC數(shù)據(jù)是一項(xiàng)重要的任務(wù)。本文將介紹如何在RT-Thread實(shí)時(shí)操作系統(tǒng)中,利用消息隊(duì)列來(lái)同時(shí)處理來(lái)自串口和ADC的數(shù)據(jù)。通過(guò)這種方法,我們能夠高效地管理和處理
    的頭像 發(fā)表于 07-23 08:11 ?637次閱讀
    玩轉(zhuǎn)RT-Thread之消息<b class='flag-5'>隊(duì)列</b>的應(yīng)用

    freertos啟用IAR自帶插件調(diào)試時(shí)不能查看隊(duì)列信息怎么解決?

    在IAR平臺(tái)上調(diào)試freertos,想利用IAR自帶的freertos插件進(jìn)行調(diào)試,但是只能看task的信息,不能看隊(duì)列信息顯示
    發(fā)表于 05-07 06:54

    嵌入式實(shí)時(shí)操作系統(tǒng)中的隊(duì)列管理與應(yīng)用

    任務(wù) A 將信息存入隊(duì)列,任務(wù)B以先進(jìn)先出的方式提取信息。隊(duì)列通常應(yīng)足夠大,可以承載許多數(shù)據(jù),而不僅僅承載單個(gè)數(shù)據(jù)項(xiàng)。因此,它可以充當(dāng)緩沖或暫存器,為管道提供靈活性。
    發(fā)表于 04-30 14:27 ?634次閱讀
    嵌入式實(shí)時(shí)<b class='flag-5'>操作</b>系統(tǒng)中的<b class='flag-5'>隊(duì)列</b>管理與應(yīng)用

    Freertos隊(duì)列項(xiàng)里的字節(jié)長(zhǎng)度是否可以獲?。?/a>

    最近剛學(xué)Freertos, 看到可以獲取Freertos隊(duì)列長(zhǎng)度,但是隊(duì)列項(xiàng)里的字節(jié)長(zhǎng)度是否可以獲取? 因?yàn)轫?xiàng)目中隊(duì)列中會(huì)存放不定長(zhǎng)字節(jié),需要對(duì)隊(duì)列中的數(shù)據(jù)分揀,每次分揀的時(shí)候遍歷所
    發(fā)表于 04-29 07:17

    freertos隊(duì)列錯(cuò)亂是什么原因?qū)е碌模?/a>

    ,m_tMsgudp3->ucData,m_tMsgudp3->usLenth, remote_ip, remote_port); 接收隊(duì)列的時(shí)候,操作頻繁(周期大概40ms),發(fā)送
    發(fā)表于 04-26 06:20

    求助,關(guān)于FreeRTOS的相關(guān)疑問(wèn)求解

    或者說(shuō)隊(duì)列不是都可以實(shí)現(xiàn)嗎,那為什么還要用他呢。難道說(shuō)這只是為了省內(nèi)存嘛? 2.還有就是在實(shí)時(shí)操作系統(tǒng)下,所謂的狀態(tài)機(jī)是不是就不那么必要了。 我沒(méi)有系統(tǒng)學(xué)習(xí),也不太懂,希望大佬們能指點(diǎn)我一下,謝謝啦。
    發(fā)表于 04-24 07:08

    進(jìn)程間通信的消息隊(duì)列介紹

    消息隊(duì)列是一種非常常見(jiàn)的進(jìn)程間通信方式。
    的頭像 發(fā)表于 04-08 17:27 ?325次閱讀

    嵌入式實(shí)時(shí)操作系統(tǒng)優(yōu)先級(jí)搶占式調(diào)度機(jī)制解析

    當(dāng)搶占發(fā)生時(shí),任務(wù)即使沒(méi)有完成也會(huì)被迫放棄處理器,此時(shí)任務(wù)并沒(méi)有被掛起,而是會(huì)返回就緒隊(duì)列。任務(wù)在隊(duì)列中的位置是由優(yōu)先級(jí)決定的,它會(huì)在隊(duì)列中等待到下一次被調(diào)度。
    的頭像 發(fā)表于 04-05 05:22 ?2886次閱讀
    嵌入式實(shí)時(shí)<b class='flag-5'>操作</b>系統(tǒng)優(yōu)先級(jí)搶占式調(diào)度機(jī)制解析

    MCU專屬隊(duì)列功能模塊之QueueForMcu應(yīng)用

    當(dāng)需要從隊(duì)列頭部獲取多個(gè)數(shù)據(jù),但又不希望數(shù)據(jù)從隊(duì)列中刪除時(shí),可以使用 Queue_Peek_Array 函數(shù)來(lái)實(shí)現(xiàn),該函數(shù)的參數(shù)與返回值與 Queue_Pop_Array 完全相同。
    發(fā)表于 03-20 11:44 ?533次閱讀
    MCU專屬<b class='flag-5'>隊(duì)列</b>功能模塊之QueueForMcu應(yīng)用

    TC399 adc能添加到同一個(gè)隊(duì)列中并得到結(jié)果嗎?加入隊(duì)列是否有任何限制?

    添加到隊(duì)列中并得到結(jié)果。 我的疑問(wèn)是,有了這些不同的頻道和組,我還能把它們添加到同一個(gè)隊(duì)列中并得到結(jié)果嗎?加入隊(duì)列是否有任何限制?
    發(fā)表于 03-04 06:33

    裸機(jī)中環(huán)形隊(duì)列與RTOS中消息隊(duì)列有何區(qū)別呢?

    “環(huán)形隊(duì)列”和“消息隊(duì)列”在嵌入式領(lǐng)域有應(yīng)用非常廣泛,相信有經(jīng)驗(yàn)的嵌入式軟件工程師對(duì)它們都不陌生。
    的頭像 發(fā)表于 01-26 09:38 ?735次閱讀
    裸機(jī)中環(huán)形<b class='flag-5'>隊(duì)列</b>與RTOS中消息<b class='flag-5'>隊(duì)列</b>有何區(qū)別呢?