關(guān)于epoll
驚群?jiǎn)栴},什么是驚群呢?
比如我們?cè)趯?xiě)代碼過(guò)程中,使用兩個(gè)線程的epoll
監(jiān)聽(tīng)socket
,當(dāng)socket
上有事件發(fā)生時(shí),兩個(gè)epoll
都會(huì)被喚醒,導(dǎo)致會(huì)操作同一個(gè)socket
,這就是驚群,那如何解決呢?
(1)使用EPOLLEXCLUSIVE
:EPOLLEXCLUSIVE
是epoll
的擴(kuò)展選項(xiàng),它允許一個(gè)線程獨(dú)占一個(gè)epoll
實(shí)例,從而避免了epoll
的驚群?jiǎn)栴};
(2)使用EPOLLONESHOT
:對(duì)于注冊(cè)了EPOLLONESHOT
事件的文件描述符,操作系統(tǒng)最多觸發(fā)一個(gè)可讀,可寫(xiě)或者異常事件,且觸發(fā)一次,這樣就能確保一個(gè)線程獲取事件并處理,但是需要注意的是對(duì)于監(jiān)聽(tīng)類型(如accept
)不能使用EPOLLONESHOT
,否則就不能持續(xù)監(jiān)聽(tīng)連接,對(duì)于處理完了的非監(jiān)聽(tīng)事件,需要重置EPOLLONESHOT
;
第一部分:信號(hào)
1、發(fā)送信號(hào)給進(jìn)程
#include < sys/types.h >
#include < signal.h >
int kill(pid_t pid, int sig);
2、信號(hào)回調(diào)函數(shù)
#include < signal.h >
typedef void (&__sighandler_t) (int);
__sighandler_t signal(int sig, __sighandler_t _handler);
int sigaction(int sig, const struct sigaction *act, struct sigaction *oact);
(1)__sighandler_t
信號(hào)處理的函數(shù)指針,其中處理參數(shù)為觸發(fā)信號(hào)當(dāng)前值,其中有兩個(gè)默認(rèn)宏(SIG_DFL:使用信號(hào)默認(rèn)處理,SIG_IGN:忽略目標(biāo)信號(hào));
(2)signal
注冊(cè)信號(hào)回調(diào)處理函數(shù),返回值為一個(gè)函數(shù)指針,含義是這個(gè)信號(hào)上一次處理的回調(diào)函數(shù)或者是系統(tǒng)默認(rèn)的處理函數(shù),這里目的是讓用戶可以自己恢復(fù)信號(hào)處理方式,比如系統(tǒng)對(duì)于一些信號(hào)是殺掉進(jìn)程的,這里就應(yīng)該處理完自己的回調(diào)邏輯后再調(diào)用系統(tǒng)默認(rèn)行為;
(3)sigaction
函數(shù)的功能是檢查或修改與指定信號(hào)相關(guān)聯(lián)的處理動(dòng)作,使用樣例如下:
#include < stdio.h >
#include < unistd.h >
#include < stdlib.h >
#include < signal.h >
int main()
{
struct sigaction newact, oldact;
newact.sa_handler = SIG_IGN; // 設(shè)置信號(hào)忽略,也可以設(shè)置為處理函數(shù)
sigemptyset(&newact.sa_mask);
newact.sa_flags = 0;
int count = 0;
pid_t pid = 0;
sigaction(SIGINT, &newact, &oldact); // 原始的備份到oldact,為后續(xù)的處理恢復(fù)
pid = fork();
if (pid == 0)
{
while(1)
{
printf("child exec ...n");
sleep(1);
}
return 0;
}
while (1)
{
if (count++ > 3)
{
sigaction(SIGINT, &oldact, NULL); // 恢復(fù)父進(jìn)程信號(hào)處理方式
kill(pid, SIGKILL); // 父進(jìn)程發(fā)信號(hào)給子進(jìn)程
}
printf("father exec ...n");
sleep(1);
}
return 0;
}
第二部分:定時(shí)器
在Linux網(wǎng)絡(luò)編程中,定時(shí)器的作用主要是管理定時(shí)任務(wù),處理過(guò)期連接,檢測(cè)超時(shí)隊(duì)列等,那我們可以通過(guò)哪些方式實(shí)現(xiàn)定時(shí)器呢?
1、利用系統(tǒng)API
...
setsockopt(socketfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
setsockopt(socketfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, len);
int number = epoll_wait(fd, events, MAX_EVENT_NUMBER, timeout);
...
通過(guò)使用socket的參數(shù),設(shè)置連接句柄的發(fā)送和接收數(shù)據(jù)超時(shí)時(shí)間,可以實(shí)現(xiàn)定時(shí)處理:
(1)SO_SNDTIMEO
發(fā)送數(shù)據(jù)超時(shí)時(shí)間,根據(jù)timeout
設(shè)置;
(2)SO_RCVTIMEO
接收數(shù)據(jù)超時(shí)時(shí)間,根據(jù)timeout
設(shè)置;
IO復(fù)用的參數(shù)中都帶了一個(gè)timeout
參數(shù),可以設(shè)置來(lái)達(dá)到定時(shí)觸發(fā)分支邏輯,比如epoll_wait
;
2、簡(jiǎn)單的定時(shí)器
(1)啟動(dòng)一個(gè)線程實(shí)現(xiàn)定時(shí)器,具體實(shí)現(xiàn)如下圖:
- 主線程啟動(dòng),開(kāi)始執(zhí)行任務(wù),這里可以是網(wǎng)絡(luò)收發(fā)或者其他;
- 啟動(dòng)一個(gè)線程,做定時(shí)任務(wù)處理使用;
- 主線程需要增加定時(shí)任務(wù),可以將任務(wù)封裝為task,添加到任務(wù)隊(duì)列中;
- 同時(shí)通知定時(shí)線程,隊(duì)列中有任務(wù)了,這里通知機(jī)制可以是信號(hào)量或者廣播方式;
- 定時(shí)線程取出隊(duì)列中任務(wù),判斷當(dāng)前任務(wù)是否過(guò)期,如果過(guò)期就執(zhí)行,沒(méi)有過(guò)期就繼續(xù)放入任務(wù)隊(duì)列中,同時(shí)這里需要讓線程等待隊(duì)列中距離下一個(gè)周期最短的時(shí)間,繼續(xù)取隊(duì)列任務(wù);
(2)使用epoll_wait
設(shè)置timeout,是在網(wǎng)絡(luò)事件觸發(fā)的定時(shí)器中最方便的方式,具體邏輯如下:
...
start_timer = ... // 開(kāi)始執(zhí)行時(shí)間
while (true) {
int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, timeout);
for (...) {
...
// 處理連接任務(wù)
...
}
end_timer = ... // epoll_wait返回并處理任務(wù)時(shí)間
// 處理定時(shí)任務(wù),判斷當(dāng)前時(shí)間是否在一個(gè)timeout
if (end_timer - start_timer > timeout) { // 這里是偽代碼,具體時(shí)間判斷可以參考linux結(jié)構(gòu)體
...
// 啟動(dòng)線程執(zhí)行定時(shí)任務(wù)邏輯
...
}
}
3、時(shí)間輪
時(shí)間輪是一種高效定時(shí)器,通過(guò)類似圓盤(pán)的形式定義每個(gè)tick,定時(shí)轉(zhuǎn)動(dòng)圓盤(pán),假設(shè)每次tick時(shí)間為si
,一個(gè)時(shí)間輪有N個(gè)tick,那么執(zhí)行轉(zhuǎn)動(dòng)一圈時(shí)間為N*si
;
現(xiàn)在插入一個(gè)任務(wù),需要to1
時(shí)間周期后執(zhí)行,這里就分情況處理:
(1)如果to1
< N*si
,則需要分配到(當(dāng)前時(shí)間輪的位置 + to1
/ si
)的位置上,等待自然tick到達(dá)執(zhí)行當(dāng)前to1
的定時(shí)任務(wù);
(2)如果to1
> N*si
,則需要分配到(當(dāng)前時(shí)間輪的位置 + (to1
% N) / si
+ N)的位置上,由于to1
執(zhí)行時(shí)間超過(guò)一輪的周期,所以需要等待多輪轉(zhuǎn)動(dòng)后才能執(zhí)行,那如何處理呢?因此我們將每個(gè)輪的tick上掛一個(gè)鏈表,這個(gè)鏈表的節(jié)點(diǎn)表示到達(dá)這個(gè)tick需要執(zhí)行的任務(wù)to1
,這里的節(jié)點(diǎn)有可能是大于一個(gè)輪轉(zhuǎn)動(dòng)的事件周期,也可能就是當(dāng)前輪時(shí)間周期內(nèi)執(zhí)行,我們只需要當(dāng)事件到達(dá)tick時(shí),取出鏈表遍歷鏈表節(jié)點(diǎn)to1
,判斷是否是當(dāng)前事件周期內(nèi)執(zhí)行,如果是摘除鏈表節(jié)點(diǎn)然后執(zhí)行任務(wù),如果不是則重新計(jì)算to1
需要多久后執(zhí)行,計(jì)算方法就和上面的一樣(當(dāng)前時(shí)間輪位置 + ((to1
- 鏈表最小的周期時(shí)間) % N) / si
+ N),然后將當(dāng)前鏈表節(jié)點(diǎn)重新放回;
事件輪
4、時(shí)間堆
堆的數(shù)據(jù)結(jié)構(gòu)應(yīng)該大家都比較熟悉了,堆是一種滿足以下條件的樹(shù):
- 堆中某個(gè)節(jié)點(diǎn)的值總是不大于或不小于其父節(jié)點(diǎn)的值;
- 堆總是一棵完全二叉樹(shù);
- 添加堆節(jié)點(diǎn)的時(shí)間復(fù)雜度O(lgn),刪除節(jié)點(diǎn)是O(lgn),獲取節(jié)點(diǎn)是O(1);
時(shí)間堆
(1)循環(huán)線程讀取最小時(shí)間堆的堆頂元素;
(2)取出最小節(jié)點(diǎn),判斷當(dāng)前事件是否過(guò)期,如果過(guò)期則繼續(xù)執(zhí)行,否則不處理;
(3)將最小節(jié)點(diǎn)對(duì)應(yīng)的事件丟給執(zhí)行線程執(zhí)行;
這里最小時(shí)間堆節(jié)點(diǎn)在代碼實(shí)現(xiàn)中可以用一個(gè)數(shù)組表示,使用完全二叉樹(shù)的排列。
#include< iostream >
void heapify(int arr[], int n, int i) {
if (i >= n) return;
int min_node = i;
int lson = i * 2 + 1;
int rson = i * 2 + 2;
if (lson < n && arr[min_node] > arr[lson]) { // 和左孩子比較,找到最小節(jié)點(diǎn)
min_node = lson;
}
if (rson < n && arr[min_node] > arr[rson]) { // 和右孩子比較,找到最小節(jié)點(diǎn)
min_node = rson;
}
if (min_node != i) {
swap(arr[min_node], arr[i]);
heapify(arr, n, min_node); // 遞歸處理
}
}
void heapSort(int arr[], int n) {
// 反向取出最后一個(gè)節(jié)點(diǎn)
int lastNode = n - 1;
int parent = (lastNode - 1) / 2;
for (int i = parent; i >= 0; i--) {
heapify(arr, n, i);
}
for (int i = n - 1; i >= 0; i--) {
swap(arr[i], arr[0]);
heapify(arr, i, 0); // 調(diào)整堆節(jié)點(diǎn)
}
}
int main() {
int arr[5] = { 70, 41, 10, 90, 18, 26 };
heap_sort(arr, sizeof(arr) / sizeof(arr[0]));
for (int i = 0; i < sizeof(arr) / sizeof(arr[0]); i++) {
cout < < arr[i] < < endl;
}
return 0;
}
評(píng)論
查看更多