什么是select?
有的朋友可能對select也不是很了解啊,我這里稍微科普一下:網(wǎng)絡(luò)連接,服務(wù)器也是通過文件描述符來管理這些連接上來的客戶端,既然是供連接的服務(wù)器,那就免不了要接收來自客戶端的消息。那么多臺客戶端,消息那么的多,要是漏了一條兩條重要消息,那也不要用TCP了,那怎么辦?
前輩們就是有辦法,輪詢,輪詢每個客戶端文件描述符,查看他們是否帶著消息,如果帶著,那就處理一下;如果沒帶著,那就一邊等著去。這就是select,輪詢,頗有點領(lǐng)導(dǎo)下基層的那種感覺哈。
但是這個select的輪詢吶,會有個問題,明眼人一下就能想到,那即是耗費資源啊,耗費什么資源,時間吶,慢吶(其實也挺快了,不過相對epoll來說就是慢)。 再認真想一下,還浪費什么資源,系統(tǒng)資源。有的客戶端吶,占著那啥玩意兒不干那啥事兒,這種客戶端吶,還不少。這也怪不得人家,哪兒有客戶端時時刻刻在發(fā)消息,要是有,那就要小心是不是惡意攻擊了。那把這么一堆偶爾動一下的客戶端的文件描述符一直攥手里,累不累?能一次攥多少個?就像一個老板,一直想著下去巡視,那他可以去當車間組長了哈哈哈。
所以,select的默認上限一般是1024(FD_SETSIZE),當然我們可以手動去改,但是人家給個1024自然有人家的道理,改太大的話系統(tǒng)在這一塊的負載就大了。 那句話怎么說的來著,你每次對系統(tǒng)的索取,其實都早已明碼標價!哈哈哈。。。
所以,我們選用epoll模型。
什么是epoll?
epoll接口是為解決Linux內(nèi)核處理大量文件描述符而提出的方案。該接口屬于Linux下多路I/O復(fù)用接口中select/poll的增強。其經(jīng)常應(yīng)用于Linux下高并發(fā)服務(wù)型程序,特別是在大量并發(fā)連接中只有少部分連接處于活躍下的情況 (通常是這種情況),在該情況下能顯著的提高程序的CPU利用率。
前面說,select就像親自下基層視察的老板,那么epoll這個老板就要顯得精明的多了。他可不親自下基層,他找了個美女秘書,他只要盯著他的秘書看就行了,呸,他只需要聽取他的秘書的匯報就行了。匯報啥呢?基層有任何消息,跟秘書說,秘書匯總之后一次性交給老板來處理。這樣老板的時間不就大大的提高了嘛。
如果你學(xué)過設(shè)計模式,這就是典型的“命令模式”,非常符合“依賴倒置原則”,這是一個非常美妙的模式,這個原則也是我最喜歡的一個原則,將高層實現(xiàn)與低層實現(xiàn)解耦合,從而可以各自開發(fā),只要接口一致便可,這個接口,就是秘書。
扯遠了,如果對“設(shè)計模式”有興趣,可以找我的專欄。
好,言歸正傳哈哈哈。
epoll的設(shè)計思路
- (1)epoll在Linux內(nèi)核中構(gòu)建了一個文件系統(tǒng),該文件系統(tǒng)采用紅黑樹來構(gòu)建,紅黑樹在增加和刪除上面的效率極高,因此是epoll高效的原因之一。有興趣可以百度紅黑樹了解,但在這里你只需知道其算法效率超高即可。
- (2)epoll提供了兩種觸發(fā)模式,水平觸發(fā)(LT)和邊沿觸發(fā)(ET)。當然,涉及到I/O操作也必然會有阻塞和非阻塞兩種方案。目前效率相對較高的是 epoll+ET+非阻塞I/O 模型,在具體情況下應(yīng)該合理選用當前情形中最優(yōu)的搭配方案。
- (3)epoll所支持的FD上限是最大可以打開文件的數(shù)目,這個數(shù)字一般遠大于1024,舉個例子,在1GB內(nèi)存的機器上大約是10萬左右,具體數(shù)目可以下面語句查看,一般來說這個數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大。
阻塞I/O與非阻塞I/O
為了方便理解后面的內(nèi)容,我們先看幾張圖,關(guān)于阻塞與非阻塞I/O的。
阻塞式文件I/O
非阻塞式文件I/O
多路復(fù)用I/O
好,有了上面這幾張圖墊著,咱來看看邊緣觸發(fā)和水平觸發(fā)。
邊緣觸發(fā) VS 水平觸發(fā)
EPOLL 事件有兩種模型: Edge Triggered (ET) 邊緣觸發(fā) 只有新數(shù)據(jù)到來,才觸發(fā),不管緩存區(qū)中是否還有數(shù)據(jù)。 Level Triggered (LT) 水平觸發(fā) 只要有數(shù)據(jù)都會觸發(fā),不管數(shù)據(jù)是哪里的。 (這樣表述會不會好理解一些)
LT(level triggered) 是 缺省 的工作方式 ,并且同時支持 block 和 no-block socket. 在這種做法中,內(nèi)核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的 fd 進行 IO 操作。如果你不作任何操作,內(nèi)核還是會繼續(xù)通知你的,所以,這種模式編程出錯誤可能性要小一點。傳統(tǒng)的 select/poll 都是這種模型的代表.
ET(edge-triggered) 是高速工作方式 ,只支持 no-block socket 。在這種模式下,當描述符從未就緒變?yōu)榫途w時,內(nèi)核通過 epoll 告訴你。然后它會假設(shè)你知道文件描述符已經(jīng)就緒,并且不會再為那個文件描述符發(fā)送更多的就緒通知,直到你做了某些操作導(dǎo)致那個文件描述符不再為就緒狀態(tài)了 ( 比如,你在發(fā)送,接收或者接收請求,或者發(fā)送接收的數(shù)據(jù)少于一定量時導(dǎo)致了一個 EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個 fd 作 IO 操作 ( 從而導(dǎo)致它再次變成未就緒 ) ,內(nèi)核不會發(fā)送更多的通知 (only once), 不過在 TCP 協(xié)議中, ET 模式的加速效用仍需要更多的 benchmark 確認。
要設(shè)置ET:在epoll_ctl函數(shù)中配置上EPOLLET即可
epoll 工作在 ET 模式的時候,必須使用非阻塞套接口,以避免由于一個文件句柄的阻塞讀 / 阻塞寫操作把處理多個文件描述符的任務(wù)餓死。最好以下面的方式調(diào)用 ET 模式的 epoll 接口,在后面會介紹避免可能的缺陷。
- 基于非阻塞文件句柄
- 只有當 read(2) 或者 write(2) 返回 EAGAIN 時才需要掛起,等待。但這并不是說每次 read() 時都需要循環(huán)讀,直到讀到產(chǎn)生一個 EAGAIN 才認為此次事件處理完成,當 read() 返回的讀到的數(shù)據(jù)長度小于請求的數(shù)據(jù)長度時,就可以確定此時緩沖中已沒有數(shù)據(jù)了,也就可以認為此事讀事件已處理完成。
epoll API
epoll提供的API,我所用過的其實不多,無非就那么幾個。 所以我就只能聊聊我說用過的。
頭文件
#include< sys/epoll.h >
創(chuàng)建句柄
int epoll_create(int size);
創(chuàng)建一個epoll句柄,參數(shù)size用于告訴內(nèi)核監(jiān)聽的文件描述符個數(shù),跟內(nèi)存大小有關(guān)。 返回epoll 文件描述符
控制某個epoll監(jiān)控的文件描述符上的事件:注冊,修改,刪除
參數(shù)釋義: epfd:為epoll的句柄 op:表示動作,用3個宏來表示 ··· EPOLL_CTL_ADD(注冊新的 fd 到epfd) ··· EPOLL_CTL_DEL(從 epfd 中刪除一個 fd) ··· EPOLL_CTL_MOD(修改已經(jīng)注冊的 fd 監(jiān)聽事件)
event:告訴內(nèi)核需要監(jiān)聽的事件
typedef union epoll_data
{
void* ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t; /* 保存觸發(fā)事件的某個文件描述符相關(guān)的數(shù)據(jù) */
struct epoll_event
{
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
/* epoll_event.events:
EPOLLIN 表示對應(yīng)的文件描述符可以讀
EPOLLOUT 表示對應(yīng)的文件描述符可以寫
EPOLLPRI 表示對應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀
EPOLLERR 表示對應(yīng)的文件描述符發(fā)生錯誤
EPOLLHUP 表示對應(yīng)的文件描述符被掛斷
EPOLLET 設(shè)置ET模式
*/
epoll消息讀取
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待所監(jiān)控文件描述符上有事件的產(chǎn)生
參數(shù)釋義: events:用來從內(nèi)核得到事件的集合 maxevent:用于告訴內(nèi)核這個event有多大,這個maxevent不能大于創(chuàng)建句柄時的size timeout:超時時間 ··· -1:阻塞 ··· 0:立即返回 ···>0:指定微秒
成功返回有多少個文件描述符準備就緒,時間到返回0,出錯返回-1.
代碼示例
/* 實現(xiàn)功能:通過epoll, 處理多個socket
* 監(jiān)聽一個端口,監(jiān)聽到有鏈接時,添加到epoll_event
* xs
*/
#include < stdio.h >
#include < stdlib.h >
#include < string.h >
#include < sys/socket.h >
#include < poll.h >
#include < sys/epoll.h >
#include < sys/time.h >
#include < netinet/in.h >
#include < unistd.h >
#define MYPORT 12345
//最多處理的connect
#define MAX_EVENTS 500
//當前的連接數(shù)
int currentClient = 0;
//數(shù)據(jù)接受 buf
#define REVLEN 10
char recvBuf[REVLEN];
//epoll描述符
int epollfd;
//事件數(shù)組
struct epoll_event eventList[MAX_EVENTS];
void AcceptConn(int srvfd);
void RecvData(int fd);
int main()
{
int i, ret, sinSize;
int recvLen = 0;
fd_set readfds, writefds;
int sockListen, sockSvr, sockMax;
int timeout;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;
//socket
if((sockListen=socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("socket errorn");
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(MYPORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
//bind
if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
{
printf("bind errorn");
return -1;
}
//listen
if(listen(sockListen, 5) < 0)
{
printf("listen errorn");
return -1;
}
// epoll 初始化
epollfd = epoll_create(MAX_EVENTS);
struct epoll_event event;
event.events = EPOLLIN|EPOLLET;
event.data.fd = sockListen;
//add Event
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < 0)
{
printf("epoll add fail : fd = %dn", sockListen);
return -1;
}
//epoll
while(1)
{
timeout=3000;
//epoll_wait
int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout);
if(ret < 0)
{
printf("epoll errorn");
break;
}
else if(ret == 0)
{
printf("timeout ...n");
continue;
}
//直接獲取了事件數(shù)量,給出了活動的流,這里是和poll區(qū)別的關(guān)鍵
int i = 0;
for(i=0; i< ret; i++)
{
//錯誤退出
if ((eventList[i].events & EPOLLERR) ||
(eventList[i].events & EPOLLHUP) ||
!(eventList[i].events & EPOLLIN))
{
printf ( "epoll errorn");
close (eventList[i].data.fd);
return -1;
}
if (eventList[i].data.fd == sockListen)
{
AcceptConn(sockListen);
}else{
RecvData(eventList[i].data.fd);
}
}
}
close(epollfd);
close(sockListen);
return 0;
}
/**************************************************
函數(shù)名:AcceptConn
功能:接受客戶端的鏈接
參數(shù):srvfd:監(jiān)聽SOCKET
***************************************************/
void AcceptConn(int srvfd)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
bzero(&sin, len);
int confd = accept(srvfd, (struct sockaddr*)&sin, &len);
if (confd < 0)
{
printf("bad acceptn");
return;
}else
{
printf("Accept Connection: %d", confd);
}
//將新建立的連接添加到EPOLL的監(jiān)聽中
struct epoll_event event;
event.data.fd = confd;
event.events = EPOLLIN|EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
}
//讀取數(shù)據(jù)
void RecvData(int fd)
{
int ret;
int recvLen = 0;
memset(recvBuf, 0, REVLEN);
printf("RecvData functionn");
if(recvLen != REVLEN)
{
while(1)
{
//recv數(shù)據(jù)
ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, 0);
if(ret == 0)
{
recvLen = 0;
break;
}
else if(ret < 0)
{
recvLen = 0;
break;
}
//數(shù)據(jù)接受正常
recvLen = recvLen+ret;
if(recvLen< REVLEN)
{
continue;
}
else
{
//數(shù)據(jù)接受完畢
printf("buf = %sn", recvBuf);
recvLen = 0;
break;
}
}
}
printf("data is %s", recvBuf);
}
整體拔高:高效的并發(fā)方式
并發(fā)編程的目的是讓程序”同時”執(zhí)行多個任務(wù)。如果程序是計算密集型的,并發(fā)編程并沒有什么優(yōu)勢,反而由于任務(wù)的切換使效率降低。但如果程序是I/O密集型的,那就不同了。
并發(fā)模式是指I/O處理單元和多個邏輯單元之間協(xié)調(diào)完成任務(wù)的方法,服務(wù)器主要有兩種并發(fā)編程模式:半同步/半異步(half-sync/half-async)模式和領(lǐng)導(dǎo)者/追隨者(Leader/Followers)模式。
這里講一個“半同步/半異步”。
下面的內(nèi)容需要有一定的基礎(chǔ)了,小白可以收藏一下以后變強了再看。
半同步/半異步模式
在半同步/半異步模式中,同步線程用于處理客戶邏輯,異步線程用于處理I/O事件。異步線程監(jiān)聽到客戶請求之后就將其封裝成請求對象并插入到請求隊列中。請求隊列將通知某個工作在同步模式的工作線程來讀取并處理該請求對象。
半同步/半反應(yīng)堆模式(half-sync/half-reactive模式)
半同步/半反應(yīng)堆模式是半同步/半異步模式的一種變體。 其結(jié)構(gòu)如下圖:
在上圖中,異步線程只有一個,由主線程充當,負責監(jiān)聽socket上的事件。如果監(jiān)聽socket上有新的連接請求到來,主線程就接受新的連接socket,然后往epoll內(nèi)核事件表中注冊該socket上的讀寫事件。如果連接socket上有讀寫事件發(fā)生,即有新的客戶請求到來或有數(shù)據(jù)要發(fā)送至客戶端,主線程就將該連接socket插入到請求隊列中,所有工作線程都睡眠在請求隊列上,當有任務(wù)到來時,他們通過競爭來獲取任務(wù)的接管權(quán)。 由于主線程插入請求隊列中的任務(wù)是就緒的連接socket,所以該半同步/半反應(yīng)堆模式所采用的事件處理模式是Reactor模式,即工作線程要自己從socket上讀寫數(shù)據(jù)。當然,半同步/半反應(yīng)堆模式也可以用模擬的Proactor事件處理模式,即由主線程來完成數(shù)據(jù)的讀寫操作,此時主線程將應(yīng)用程序數(shù)據(jù)、任務(wù)類型等信息封裝為一個任務(wù)對象,然后將其插入到請求隊列。
半同步/半反應(yīng)堆模式的缺點: 主線程和工作線程共享請求隊列,因而請求隊列是臨界資源,所以對請求隊列操作的時候需要加鎖保護。 每個工作線程在同一時間只能處理一個客戶請求。如果客戶數(shù)量增多,則請求隊列中堆積任務(wù)太多,客戶端的響應(yīng)會越來越慢。如果增多工作線程的話,則線程的切花也將消耗大量的CPU時間。
高效的半同步/半異步模式
在半同步/半反應(yīng)堆模式中,每個工作線程同時只能處理一個客戶請求,如果并發(fā)量大的話,客戶端響應(yīng)會很慢。如果每個工作線程都能同時處理多個客戶鏈接,則就能改善這種情況,所以就有了高效的半同步/半異步模式。 其結(jié)構(gòu)如圖:
主線程只管監(jiān)聽socket,當有新的連接socket到來時,主線程就接受連接并返回新的連接socket給某個工作線程。此后該新連接socket上的任何I/O操作都由被選中的工作線程來處理,直到客戶端關(guān)閉連接。當工作線程檢測到有新的連接socket到來時,就把該新的連接socket的讀寫事件注冊到自己的epoll內(nèi)核事件表中。 主線程和工作線程都維持自己的事件循環(huán),他們各自獨立的監(jiān)聽不同事件。因此在這種高效的半同步/半異步模式中,每個線程都工作在異步模式中,所以它并非嚴格意義上的半同步/半異步模式。
epoll源碼學(xué)習
數(shù)據(jù)結(jié)構(gòu)
eventpoll
// epoll的核心實現(xiàn)對應(yīng)于一個epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在這里
// f_op- >poll() 使用的, 被其他事件通知機制利用的wait_address
wait_queue_head_t poll_wait;
//已就緒的需要檢查的epitem 列表
struct list_head rdllist;
//保存所有加入到當前epoll的文件對應(yīng)的epitem
struct rb_root rbr;
// 當正在向用戶空間復(fù)制數(shù)據(jù)時, 產(chǎn)生的可用文件
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
//優(yōu)化循環(huán)檢查,避免循環(huán)檢查中重復(fù)的遍歷
int visited;
struct list_head visited_list_link;
}
epitem
// 對應(yīng)于一個加入到epoll的文件
struct epitem {
// 掛載到eventpoll 的紅黑樹節(jié)點
struct rb_node rbn;
// 掛載到eventpoll.rdllist 的節(jié)點
struct list_head rdllink;
// 連接到ovflist 的指針
struct epitem *next;
/* 文件描述符信息fd + file, 紅黑樹的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 當前文件的等待隊列(eppoll_entry)列表
// 同一個文件上可能會監(jiān)視多種事件,
// 這些事件可能屬于不同的wait_queue中
// (取決于對應(yīng)文件類型的實現(xiàn)),
// 所以需要使用鏈表
struct list_head pwqlist;
// 當前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 傳入的用戶數(shù)據(jù) */
struct epoll_event event;
};
eppoll_entry
// 與一個文件上的一個wait_queue_head 相關(guān)聯(lián),因為同一文件可能有多個等待的事件,
//這些事件可能使用不同的等待隊列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 添加到wait_queue 中的節(jié)點
wait_queue_t wait;
// 文件wait_queue 頭
wait_queue_head_t *whead;
};
函數(shù)接口
epoll_create()
//先進行判斷size是否 >=0,若是則直接調(diào)用epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
SYSCALL_DEFINE1是一個宏,用于定義有一個參數(shù)的系統(tǒng)調(diào)用函數(shù),上述宏展開后即成為: int sys_epoll_create(int size),這就是epoll_create系統(tǒng)調(diào)用的入口。至于為何要用宏而不是直接聲明,主要是因為系統(tǒng)調(diào)用的參數(shù)個數(shù)、傳參方式都有嚴格限制,最多六個參數(shù)。
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;//主描述符
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/* 對于epoll來講, 目前唯一有效的flag就是CLOEXEC */
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/* 分配一個struct eventpoll */
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
/* 這里是創(chuàng)建一個匿名fd。
epollfd本身并不存在一個真正的文件與之對應(yīng), 所以內(nèi)核需要創(chuàng)建一個
"虛擬"的文件, 并為之分配真正的struct file結(jié)構(gòu), 而且有真正的fd.
這里2個參數(shù)比較關(guān)鍵:
eventpoll_fops, fops就是file operations, 就是當你對這個文件(這里是虛擬的)進行操作(比如讀)時,
fops里面的函數(shù)指針指向真正的操作實現(xiàn), 類似C++里面虛函數(shù)和子類的概念.
epoll只實現(xiàn)了poll和release(就是close)操作, 其它文件系統(tǒng)操作都有VFS全權(quán)處理了.
ep, ep就是struct epollevent, 它會作為一個私有數(shù)據(jù)保存在struct file的private指針里面.
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
// epoll 文件系統(tǒng)的相關(guān)實現(xiàn)
// epoll 文件系統(tǒng)初始化, 在系統(tǒng)啟動時會調(diào)用
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可添加到epoll的最多的描述符數(shù)量
max_user_watches = (((si.totalram - si.totalhigh) / 25) < < PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化遞歸檢查隊列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分別用來分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
epoll_ctl()
//創(chuàng)建好epollfd后, 接下來添加fd
//epoll_ctl的參數(shù):epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要監(jiān)聽的描述符,event 我們感興趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
//錯誤處理以及從用戶空間將epoll_event結(jié)構(gòu)copy到內(nèi)核空間.
if (ep_op_has_event(op) &&
// 復(fù)制用戶空間數(shù)據(jù)到內(nèi)核
copy_from_user(&epds, event, sizeof(struct epoll_event))) {
goto error_return;
}
// 取得 epfd 對應(yīng)的文件
error = -EBADF;
file = fget(epfd);
if (!file) {
goto error_return;
}
// 取得目標文件
tfile = fget(fd);
if (!tfile) {
goto error_fput;
}
// 目標文件必須提供 poll 操作
error = -EPERM;
if (!tfile- >f_op || !tfile- >f_op- >poll) {
goto error_tgt_fput;
}
// 添加自身或epfd 不是epoll 句柄
error = -EINVAL;
if (file == tfile || !is_file_epoll(file)) {
goto error_tgt_fput;
}
// 取得內(nèi)部結(jié)構(gòu)eventpoll
ep = file- >private_data;
// EPOLL_CTL_MOD 不需要加全局鎖 epmutex
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
// 目標文件也是epoll 檢測是否有循環(huán)包含的問題
if (ep_loop_check(ep, tfile) != 0) {
goto error_tgt_fput;
}
} else
{
// 將目標文件添加到 epoll 全局的tfile_check_list 中
list_add(&tfile- >f_tfile_llink, &tfile_check_list);
}
}
mutex_lock_nested(&ep- >mtx, 0);
// 以tfile 和fd 為key 在rbtree 中查找文件對應(yīng)的epitem
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 沒找到, 添加額外添加ERR HUP 事件
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else {
error = -EEXIST;
}
// 清空文件檢查列表
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi) {
error = ep_remove(ep, epi);
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else {
error = -ENOENT;
}
break;
}
mutex_unlock(&ep- >mtx);
error_tgt_fput:
if (did_lock_epmutex) {
mutex_unlock(&epmutex);
}
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
//ep_insert()在epoll_ctl()中被調(diào)用, 完成往epollfd里面添加一個監(jiān)聽fd的工作
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/*
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
*/
// 增加監(jiān)視文件數(shù)
user_watches = atomic_long_read(&ep- >user- >epoll_watches);
if (unlikely(user_watches >= max_user_watches)) {
return -ENOSPC;
}
// 分配初始化 epi
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {
return -ENOMEM;
}
INIT_LIST_HEAD(&epi- >rdllink);
INIT_LIST_HEAD(&epi- >fllink);
INIT_LIST_HEAD(&epi- >pwqlist);
epi- >ep = ep;
// 初始化紅黑樹中的key
ep_set_ffd(&epi- >ffd, tfile, fd);
// 直接復(fù)制用戶結(jié)構(gòu)
epi- >event = *event;
epi- >nwait = 0;
epi- >next = EP_UNACTIVE_PTR;
// 初始化臨時的 epq
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 設(shè)置事件掩碼
epq.pt._key = event- >events;
// 內(nèi)部會調(diào)用ep_ptable_queue_proc, 在文件對應(yīng)的wait queue head 上
// 注冊回調(diào)函數(shù), 并返回當前文件的狀態(tài)
revents = tfile- >f_op- >poll(tfile, &epq.pt);
// 檢查錯誤
error = -ENOMEM;
if (epi- >nwait < 0) { // f_op- >poll 過程出錯
goto error_unregister;
}
// 添加當前的epitem 到文件的f_ep_links 鏈表
spin_lock(&tfile- >f_lock);
list_add_tail(&epi- >fllink, &tfile- >f_ep_links);
spin_unlock(&tfile- >f_lock);
// 插入epi 到rbtree
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check()) {
goto error_remove_epi;
}
spin_lock_irqsave(&ep- >lock, flags);
/* 文件已經(jīng)就緒插入到就緒鏈表rdllist */
if ((revents & event- >events) && !ep_is_linked(&epi- >rdllink)) {
list_add_tail(&epi- >rdllink, &ep- >rdllist);
if (waitqueue_active(&ep- >wq))
// 通知sys_epoll_wait , 調(diào)用回調(diào)函數(shù)喚醒sys_epoll_wait 進程
{
wake_up_locked(&ep- >wq);
}
// 先不通知調(diào)用eventpoll_poll 的進程
if (waitqueue_active(&ep- >poll_wait)) {
pwake++;
}
}
spin_unlock_irqrestore(&ep- >lock, flags);
atomic_long_inc(&ep- >user- >epoll_watches);
if (pwake)
// 安全通知調(diào)用eventpoll_poll 的進程
{
ep_poll_safewake(&ep- >poll_wait);
}
return 0;
error_remove_epi:
spin_lock(&tfile- >f_lock);
// 刪除文件上的 epi
if (ep_is_linked(&epi- >fllink)) {
list_del_init(&epi- >fllink);
}
spin_unlock(&tfile- >f_lock);
// 從紅黑樹中刪除
rb_erase(&epi- >rbn, &ep- >rbr);
error_unregister:
// 從文件的wait_queue 中刪除, 釋放epitem 關(guān)聯(lián)的所有eppoll_entry
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep- >lock, flags);
if (ep_is_linked(&epi- >rdllink)) {
list_del_init(&epi- >rdllink);
}
spin_unlock_irqrestore(&ep- >lock, flags);
// 釋放epi
kmem_cache_free(epi_cache, epi);
return error;
}
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{
int pollflags;
struct eventpoll *ep = file- >private_data;
// 插入到wait_queue
poll_wait(file, &ep- >poll_wait, wait);
// 掃描就緒的文件列表, 調(diào)用每個文件上的poll 檢測是否真的就緒,
// 然后復(fù)制到用戶空間
// 文件列表中有可能有epoll文件, 調(diào)用poll的時候有可能會產(chǎn)生遞歸,
// 調(diào)用所以用ep_call_nested 包裝一下, 防止死循環(huán)和過深的調(diào)用
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current);
// static struct nested_calls poll_readywalk_ncalls;
return pollflags != -1 ? pollflags : 0;
}
// 通用的poll_wait 函數(shù), 文件的f_ops- >poll 通常會調(diào)用此函數(shù)
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p- >_qproc && wait_address) {
// 調(diào)用_qproc 在wait_address 上添加節(jié)點和回調(diào)函數(shù)
// 調(diào)用 poll_table_struct 上的函數(shù)指針向wait_address添加節(jié)點, 并設(shè)置節(jié)點的func
// (如果是select或poll 則是 __pollwait, 如果是 epoll 則是 ep_ptable_queue_proc),
p- >_qproc(filp, wait_address, p);
}
}
/*
* 該函數(shù)在調(diào)用f_op- >poll()時會被調(diào)用.
* 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關(guān)聯(lián)起來的.
* 關(guān)聯(lián)的辦法就是使用等待隊列(waitqueue)
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi- >nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/* 初始化等待隊列, 指定ep_poll_callback為喚醒時的回調(diào)函數(shù),
* 當我們監(jiān)聽的fd發(fā)生狀態(tài)改變時, 也就是隊列頭被喚醒時,
* 指定的回調(diào)函數(shù)將會被調(diào)用. */
init_waitqueue_func_entry(&pwq- >wait, ep_poll_callback);
pwq- >whead = whead;
pwq- >base = epi;
/* 將剛分配的等待隊列成員加入到頭中, 頭是由fd持有的 */
add_wait_queue(whead, &pwq- >wait);
list_add_tail(&pwq- >llink, &epi- >pwqlist);
/* nwait記錄了當前epitem加入到了多少個等待隊列中,
* 我認為這個值最大也只會是1... */
epi- >nwait++;
} else {
/* We have to signal that an error occurred */
epi- >nwait = -1;
}
}
//回調(diào)函數(shù), 當我們監(jiān)聽的fd發(fā)生狀態(tài)改變時, 它會被調(diào)用.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
//從等待隊列獲取epitem.需要知道哪個進程掛載到這個設(shè)備
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi- >ep;//獲取
spin_lock_irqsave(&ep- >lock, flags);
if (!(epi- >event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/* 沒有我們關(guān)心的event... */
if (key && !((unsigned long) key & epi- >event.events))
goto out_unlock;
/*
* 這里看起來可能有點費解, 其實干的事情比較簡單:
* 如果該callback被調(diào)用的同時, epoll_wait()已經(jīng)返回了,
* 也就是說, 此刻應(yīng)用程序有可能已經(jīng)在循環(huán)獲取events,
* 這種情況下, 內(nèi)核將此刻發(fā)生event的epitem用一個單獨的鏈表
* 鏈起來, 不發(fā)給應(yīng)用程序, 也不丟棄, 而是在下一次epoll_wait
* 時返回給用戶.
*/
if (unlikely(ep- >ovflist != EP_UNACTIVE_PTR)) {
if (epi- >next == EP_UNACTIVE_PTR) {
epi- >next = ep- >ovflist;
ep- >ovflist = epi;
}
goto out_unlock;
}
/* 將當前的epitem放入ready list */
if (!ep_is_linked(&epi- >rdllink))
list_add_tail(&epi- >rdllink, &ep- >rdllist);
/* 喚醒epoll_wait... */
if (waitqueue_active(&ep- >wq))
wake_up_locked(&ep- >wq);
/* 如果epollfd也在被poll, 那就喚醒隊列里面的所有成員. */
if (waitqueue_active(&ep- >poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep- >lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep- >poll_wait);
return 1;
}
epoll_wait()
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/* 這個地方有必要說明一下:
* 內(nèi)核對應(yīng)用程序采取的策略是"絕對不信任",
* 所以內(nèi)核跟應(yīng)用程序之間的數(shù)據(jù)交互大都是copy, 不允許(也時候也是不能...)指針引用.
* epoll_wait()需要內(nèi)核返回數(shù)據(jù)給用戶空間, 內(nèi)存由用戶程序提供,
* 所以內(nèi)核會用一些手段來驗證這一段內(nèi)存空間是不是有效的.
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
/* 獲取epollfd的struct file, epollfd也是文件嘛 */
file = fget(epfd);
if (!file)
goto error_return;
error = -EINVAL;
/* 檢查一下它是不是一個真正的epollfd... */
if (!is_file_epoll(file))
goto error_fput;
/* 獲取eventpoll結(jié)構(gòu) */
ep = file- >private_data;
/* 等待事件到來~~ */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
/* 這個函數(shù)真正將執(zhí)行epoll_wait的進程帶入睡眠狀態(tài)... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;//等待隊列
/* 計算睡覺時間, 毫秒要轉(zhuǎn)換為HZ */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep- >lock, flags);
res = 0;
/* 如果ready list不為空, 就不睡了, 直接干活... */
if (list_empty(&ep- >rdllist)) {
/* OK, 初始化一個等待隊列, 準備直接把自己掛起,
* 注意current是一個宏, 代表當前進程 */
init_waitqueue_entry(&wait, current);//初始化等待隊列,wait表示當前進程
__add_wait_queue_exclusive(&ep- >wq, &wait);//掛載到ep結(jié)構(gòu)的等待隊列
for (;;) {
/* 將當前進程設(shè)置位睡眠, 但是可以被信號喚醒的狀態(tài),
* 注意這個設(shè)置是"將來時", 我們此刻還沒睡! */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果這個時候, ready list里面有成員了,
* 或者睡眠時間已經(jīng)過了, 就直接不睡了... */
if (!list_empty(&ep- >rdllist) || !jtimeout)
break;
/* 如果有信號產(chǎn)生, 也起床... */
if (signal_pending(current)) {
res = -EINTR;
break;
}
/* 啥事都沒有,解鎖, 睡覺... */
spin_unlock_irqrestore(&ep- >lock, flags);
/* jtimeout這個時間后, 會被喚醒,
* ep_poll_callback()如果此時被調(diào)用,
* 那么我們就會直接被喚醒, 不用等時間了...
* 再次強調(diào)一下ep_poll_callback()的調(diào)用時機是由被監(jiān)聽的fd
* 的具體實現(xiàn), 比如socket或者某個設(shè)備驅(qū)動來決定的,
* 因為等待隊列頭是他們持有的, epoll和當前進程
* 只是單純的等待...
**/
jtimeout = schedule_timeout(jtimeout);//睡覺
spin_lock_irqsave(&ep- >lock, flags);
}
__remove_wait_queue(&ep- >wq, &wait);
/* OK 我們醒來了... */
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep- >rdllist) || ep- >ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep- >lock, flags);
/* 如果一切正常, 有event發(fā)生, 就開始準備數(shù)據(jù)copy給用戶空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
//調(diào)用p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
//由ep_send_events()調(diào)用本函數(shù)
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
mutex_lock(&ep- >mtx);
spin_lock_irqsave(&ep- >lock, flags);
/* 這一步要注意, 首先, 所有監(jiān)聽到events的epitem都鏈到rdllist上了,
* 但是這一步之后, 所有的epitem都轉(zhuǎn)移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經(jīng)被清空了! */
list_splice_init(&ep- >rdllist, &txlist);
/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 保存后下次再處理... */
ep- >ovflist = NULL;
spin_unlock_irqrestore(&ep- >lock, flags);
/* 在這個回調(diào)函數(shù)里面處理每個epitem
* sproc 就是 ep_send_events_proc, 下面會注釋到. */
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep- >lock, flags);
/* 現(xiàn)在我們來處理ovflist, 這些epitem都是我們在傳遞數(shù)據(jù)給用戶空間時
* 監(jiān)聽到了事件. */
for (nepi = ep- >ovflist; (epi = nepi) != NULL;
nepi = epi- >next, epi- >next = EP_UNACTIVE_PTR) {
/* 將這些直接放入readylist */
if (!ep_is_linked(&epi- >rdllink))
list_add_tail(&epi- >rdllink, &ep- >rdllist);
}
ep- >ovflist = EP_UNACTIVE_PTR;
/* 上一次沒有處理完的epitem, 重新插入到ready list */
list_splice(&txlist, &ep- >rdllist);
/* ready list不為空, 直接喚醒... */
if (!list_empty(&ep- >rdllist)) {
if (waitqueue_active(&ep- >wq))
wake_up_locked(&ep- >wq);
if (waitqueue_active(&ep- >poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep- >lock, flags);
mutex_unlock(&ep- >mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep- >poll_wait);
return error;
}
-
服務(wù)器
+關(guān)注
關(guān)注
12文章
9222瀏覽量
85606 -
程序
+關(guān)注
關(guān)注
117文章
3790瀏覽量
81149 -
模型
+關(guān)注
關(guān)注
1文章
3255瀏覽量
48907 -
epoll
+關(guān)注
關(guān)注
0文章
28瀏覽量
2967
發(fā)布評論請先 登錄
相關(guān)推薦
評論