作者:Goland貓?
對于大型的互聯(lián)網(wǎng)應用程序,如電商平臺、社交網(wǎng)絡、金融交易平臺等,每秒鐘都會收到大量的請求。在這些應用程序中,需要使用高效的技術來應對高并發(fā)的請求,尤其是在短時間內處理大量的請求,如1分鐘百萬請求。
同時,為了降低用戶的使用門檻和提升用戶體驗,前端需要實現(xiàn)參數(shù)的無感知傳遞。這樣用戶在使用時,無需擔心參數(shù)傳遞的問題,能夠輕松地享受應用程序的服務。
在處理1分鐘百萬請求時,需要使用高效的技術和算法,以提高請求的響應速度和處理能力。Go語言以其高效性和并發(fā)性而聞名,因此成為處理高并發(fā)請求的優(yōu)秀選擇。Go中有多種模式可供選擇,如基于goroutine和channel的并發(fā)模型、使用池技術的協(xié)程模型等,以便根據(jù)具體應用的需要來選擇適合的技術模式。
本文代碼參考搬至
W1
W1 結構體類型,它有五個成員:
WgSend 用于等待任務發(fā)送的 goroutine 完成。
Wg 用于等待任務處理的 goroutine 完成。
MaxNum 表示 goroutine 池的大小。
Ch 是一個字符串類型的通道,用于傳遞任務。
DispatchStop 是一個空結構體類型的通道,用于停止任務分發(fā)。
?
type?W1?struct?{ ?WgSend???????*sync.WaitGroup ?Wg???????????*sync.WaitGroup ?MaxNum???????int ?Ch???????????chan?string ?DispatchStop?chan?struct{} }
?
接下來是 Dispatch 方法,它將任務發(fā)送到通道 Ch 中。它通過 for 循環(huán)來發(fā)送 10 倍于 MaxNum 的任務,每個任務都是一個 goroutine。defer 語句用于在任務完成時減少 WgSend 的計數(shù)。select 語句用于在任務分發(fā)被中止時退出任務發(fā)送。
Dispatch
?
func?(w?*W1)?Dispatch(job?string)?{ ?w.WgSend.Add(10?*?w.MaxNum) ?for?i?:=?0;?i?10*w.MaxNum;?i++?{ ??go?func(i?int)?{ ???defer?w.WgSend.Done() ???select?{ ???case?w.Ch?<-?fmt.Sprintf("%d",?i): ????return ???case?<-w.DispatchStop: ????fmt.Println("退出發(fā)送?job:?",?fmt.Sprintf("%d",?i)) ????return ???} ??}(i) ?} }
?
StartPool
然后是 StartPool 方法,它創(chuàng)建了一個 goroutine 池來處理從通道 Ch 中讀取到的任務。
如果通道 Ch 還沒有被創(chuàng)建,那么它將被創(chuàng)建。如果計數(shù)器 WgSend 還沒有被創(chuàng)建,那么它也將被創(chuàng)建。如果計數(shù)器 Wg 還沒有被創(chuàng)建,那么它也將被創(chuàng)建。
如果通道 DispatchStop 還沒有被創(chuàng)建,那么它也將被創(chuàng)建。
for 循環(huán)用于創(chuàng)建 MaxNum 個 goroutine 來處理從通道中讀取到的任務。defer 語句用于在任務完成時減少 Wg 的計數(shù)。
?
func?(w?*W1)?StartPool()?{ ?if?w.Ch?==?nil?{ ??w.Ch?=?make(chan?string,?w.MaxNum) ?} ?if?w.WgSend?==?nil?{ ??w.WgSend?=?&sync.WaitGroup{} ?} ?if?w.Wg?==?nil?{ ??w.Wg?=?&sync.WaitGroup{} ?} ?if?w.DispatchStop?==?nil?{ ??w.DispatchStop?=?make(chan?struct{}) ?} ?w.Wg.Add(w.MaxNum) ?for?i?:=?0;?i??
Stop
最后是 Stop 方法,它停止任務分發(fā)并等待所有任務完成。
它關閉了通道 DispatchStop,等待 WgSend 中的任務發(fā)送 goroutine 完成,然后關閉通道 Ch,等待 Wg 中的任務處理 goroutine 完成。
?
func?(w?*W1)?Stop()?{ ?close(w.DispatchStop) ?w.WgSend.Wait() ?close(w.Ch) ?w.Wg.Wait() }?
W2
SubWorker
?
type?SubWorker?struct?{ ?JobChan?chan?string }?
子協(xié)程,它有一個 JobChan,用于接收任務。
Run:SubWorker 的方法,用于啟動一個子協(xié)程,從 JobChan 中讀取任務并執(zhí)行。
?
func?(sw?*SubWorker)?Run(wg?*sync.WaitGroup,?poolCh?chan?chan?string,?quitCh?chan?struct{})?{ ?if?sw.JobChan?==?nil?{ ??sw.JobChan?=?make(chan?string) ?} ?wg.Add(1) ?go?func()?{ ??defer?wg.Done() ??for?{ ???poolCh?<-?sw.JobChan ???select?{ ???case?res?:=?<-sw.JobChan: ????fmt.Printf("完成工作:?%s? ",?res) ???case?<-quitCh: ????fmt.Printf("消費者結束......? ") ????return ???} ??} ?}() }?
W2
?
type?W2?struct?{ ?SubWorkers?[]SubWorker ?Wg?????????*sync.WaitGroup ?MaxNum?????int ?ChPool?????chan?chan?string ?QuitChan???chan?struct{} }?
Dispatch
Dispatch:W2 的方法,用于從 ChPool 中獲取 TaskChan,將任務發(fā)送給一個 SubWorker 執(zhí)行。
?
func?(w?*W2)?Dispatch(job?string)?{ ?jobChan?:=?<-w.ChPool ?select?{ ?case?jobChan?<-?job: ??fmt.Printf("發(fā)送任務?:?%s?完成? ",?job) ??return ?case?<-w.QuitChan: ??fmt.Printf("發(fā)送者(%s)結束? ",?job) ??return ?} }?
StartPool
StartPool:W2 的方法,用于初始化協(xié)程池,啟動所有子協(xié)程并把 TaskChan 存儲在 ChPool 中。
?
func?(w?*W2)?StartPool()?{ ?if?w.ChPool?==?nil?{ ??w.ChPool?=?make(chan?chan?string,?w.MaxNum) ?} ?if?w.SubWorkers?==?nil?{ ??w.SubWorkers?=?make([]SubWorker,?w.MaxNum) ?} ?if?w.Wg?==?nil?{ ??w.Wg?=?&sync.WaitGroup{} ?} ?for?i?:=?0;?i??
Stop
Stop:W2 的方法,用于停止協(xié)程的工作,并等待所有協(xié)程結束。
?
func?(w?*W2)?Stop()?{ ?close(w.QuitChan) ?w.Wg.Wait() ?close(w.ChPool) }?
DealW2 函數(shù)則是整個協(xié)程池的入口,它通過 NewWorker 方法創(chuàng)建一個 W2 實例,然后調用 StartPool 啟動協(xié)程池,并通過 Dispatch 發(fā)送任務,最后調用 Stop 停止協(xié)程池。
?
func?DealW2(max?int)?{ ?w?:=?NewWorker(w2,?max) ?w.StartPool() ?for?i?:=?0;?i?10*max;?i++?{ ??go?w.Dispatch(fmt.Sprintf("%d",?i)) ?} ?w.Stop() }?
個人見解
看到這里對于w2我已經(jīng)有點迷糊了,還能傳遞w.Wg, w.ChPool, w.QuitChan?
?
原來是golang里如果方法傳遞的不是地址,那么就會做一個拷貝,所以這里調用的wg根本就不是一個對象。 傳遞的地方傳遞地址就可以了,如果不傳遞地址,將會出現(xiàn)死鎖 go?doSomething(i,?&wg,?ch) func?doSomething(index?int,?wg?*sync.WaitGroup,?ch?chan?int)?{?
w1也有一個比較大的問題。在處理請求時,每個 Goroutine 都會占用一定的系統(tǒng)資源,如果請求量過大,會造成 Goroutine 數(shù)量的劇增,消耗過多系統(tǒng)資源,程序可能會崩潰
探究原文
在這段代碼中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。當一個工作者完成了工作后,它會將工作結果發(fā)送到sw.JobChan,此時可以通過case res := <-sw.JobChan:來接收該工作的結果。
在這個代碼塊中,還需要處理一個退出信號quitCh。因此,第二個case <-quitCh:用于檢測是否接收到了退出信號。如果接收到了退出信號,程序將打印出消息并結束。
需要注意的是,這兩個case語句是互斥的,只有當工作者完成工作或收到退出信號時,才會進入其中一個語句。因此,這個循環(huán)可以保證在工作者完成工作或收到退出信號時退出。
需要讀取兩次sw.JobChan的原因是:第一次讀取用于將工作者的工作通道放回工作者池中,這樣其他工作者就可以使用該通道。第二次讀取用于接收工作者的工作結果或退出信號。因此,這兩次讀取是為了確保能夠在正確的時刻將工作者的工作通道放回工作者池中并正確地處理工作結果或退出信號。
根據(jù)w2的特點 我自己寫了一個w2
?
import?( ???"fmt" ???"sync" ) type?SubWorkerNew?struct?{ ???JobChan?chan?string } type?W2New?struct?{ ???SubWorkers?[]SubWorkerNew ???Wg?????????*sync.WaitGroup ???MaxNum?????int ???ChPool?????chan?chan?string ???QuitChan???chan?struct{} } func?NewW2(maxNum?int)?*W2New?{ ???subWorkers?:=?make([]SubWorkerNew,?maxNum) ???for?i?:=?0;?i??
但是有幾個點需要注意
1.沒有考慮JobChan通道的緩沖區(qū)大小,如果有大量任務被并發(fā)分配,容易導致內存占用過高;
2.每個線程都會執(zhí)行無限循環(huán),此時線程退出的條件是接收到QuitChan通道的信號,可能導致線程的阻塞等問題;
3.Dispatch函數(shù)的默認情況下只會輸出"All workers busy",而不是阻塞,這意味著當所有線程都處于忙碌狀態(tài)時,任務會丟失
4.線程池啟動后無法動態(tài)擴展或縮小。
優(yōu)化
這個優(yōu)化版本改了很多次。有一些需要注意的點是,不然會一直死鎖
?
1.使用sync.WaitGroup來確保線程池中所有線程都能夠啟動并運行; 2.在Stop函數(shù)中,先向SubWorker的JobChan中發(fā)送一個關閉信號,再等待所有SubWorker線程退出; 3.在Dispatch函數(shù)中,將默認情況下的輸出改為阻塞等待可用通道;?
w2new
?
package?handle_million_requests import?( ?"fmt" ?"sync" ?"time" ) type?SubWorkerNew?struct?{ ?Id??????int ?JobChan?chan?string } type?W2New?struct?{ ?SubWorkers?[]SubWorkerNew ?MaxNum?????int ?ChPool?????chan?chan?string ?QuitChan???chan?struct{} ?Wg?????????*sync.WaitGroup } func?NewW2(maxNum?int)?*W2New?{ ?chPool?:=?make(chan?chan?string,?maxNum) ?subWorkers?:=?make([]SubWorkerNew,?maxNum) ?for?i?:=?0;?i??1?{ ??worker?:=?w.SubWorkers[w.MaxNum-1] ??close(worker.JobChan) ??w.MaxNum-- ??w.SubWorkers?=?w.SubWorkers[:w.MaxNum] ?} }?
AddWorker和RemoveWorker,用于動態(tài)擴展/縮小線程池。
在AddWorker函數(shù)中,我們首先將MaxNum增加了1,然后創(chuàng)建一個新的SubWorkerNew結構體,將其添加到SubWorkers中,并將其JobChan通道添加到ChPool通道中。最后,我們創(chuàng)建一個新的協(xié)程來處理新添加的SubWorkerNew并讓它進入無限循環(huán),等待接收任務。
在RemoveWorker函數(shù)中,我們首先將MaxNum減少1,然后獲取最后一個SubWorkerNew結構體,將它的JobChan通道發(fā)送到ChPool通道中,并從其通道中讀取任何待處理的任務,最后創(chuàng)建一個新的協(xié)程來處理SubWorkerNew,繼續(xù)處理任務。
測試用例
?
func?TestW2New(t?*testing.T)?{?? ????pool?:=?NewW2(3)?? ????pool.StartPool()?? ????pool.Dispatch("task?1")?? ????pool.Dispatch("task?2")?? ????pool.Dispatch("task?3")?? ????pool.AddWorker()?? ????pool.AddWorker()?? ????pool.RemoveWorker()?? ????pool.Stop()?? }?
當Dispatch函數(shù)向ChPool通道獲取可用通道時,會從通道中取出一個SubWorker的JobChan通道,并將任務發(fā)送到該通道中。而對于SubWorker來說,并沒有進行任務的使用次數(shù)限制,所以它可以處理多個任務。
在這個例子中,當任務數(shù)量比SubWorker數(shù)量多時,一個SubWorker的JobChan通道會接收到多個任務,它們會在SubWorker的循環(huán)中按順序依次處理,直到JobChan中沒有未處理的任務為止。因此,如果任務數(shù)量特別大,可能會導致某些SubWorker的JobChan通道暫時處于未處理任務狀態(tài),而其他的SubWorker在執(zhí)行任務。
在測試結果中,最后三行中出現(xiàn)了多個"SubWorker 0 processing job",說明SubWorker 0的JobChan通道接收了多個任務,并且在其循環(huán)中處理這些任務。下面的代碼片段顯示了這個過程:
// SubWorker 0 的循環(huán)部分
?
for?{ ????select?{ ????case?job?:=?<-subWorker.JobChan: ????????fmt.Printf("SubWorker?%d?processing?job?%s ",?subWorker.Id,?job) ????case?<-w.QuitChan: ????????return ????} }審核編輯:湯梓紅?
評論
查看更多