前言
協(xié)程這個(gè)概念很久了,好多程序員是實(shí)現(xiàn)過(guò)這個(gè)組件的,網(wǎng)上關(guān)于協(xié)程的文章,博客,論壇都是汗牛充棟,在知乎,github上面也有很多大牛寫(xiě)了關(guān)于協(xié)程的心得體會(huì)。突發(fā)奇想,我也來(lái)實(shí)現(xiàn)一個(gè)這樣的組件,并測(cè)試了一下性能。借鑒了很多大牛的思想,閱讀了很多大牛的代碼。于是把整個(gè)思考過(guò)程寫(xiě)下來(lái)。實(shí)現(xiàn)代碼
https://github.com/wangbojing/NtyCotyCo
代碼簡(jiǎn)單易讀,如果在你的項(xiàng)目中,NtyCo能夠?yàn)槟憬鉀Q些許工程問(wèn)題,那就榮幸之至。
本文章的設(shè)計(jì)思路,是在每一個(gè)章的最前面以問(wèn)題提出,每章節(jié)的學(xué)習(xí)目的。大家能夠帶著每章的問(wèn)題來(lái)讀每章節(jié)的內(nèi)容,方便讀者能夠方便的進(jìn)入每章節(jié)的思考。讀者讀完以后加上案例代碼閱讀,編譯,運(yùn)行,能夠?qū)ι衩氐膮f(xié)程有一個(gè)全新的理解。能夠運(yùn)用到工程代碼,幫助你更加方便高效的完成工程工作。
第一章 協(xié)程的起源
問(wèn)題:協(xié)程存在的原因?協(xié)程能夠解決哪些問(wèn)題?
在我們現(xiàn)在CS,BS開(kāi)發(fā)模式下,服務(wù)器的吞吐量是一個(gè)很重要的參數(shù)。其實(shí)吞吐量是IO處理時(shí)間加上業(yè)務(wù)處理。為了簡(jiǎn)單起見(jiàn),比如,客戶端與服務(wù)器之間是長(zhǎng)連接的,客戶端定期給服務(wù)器發(fā)送心跳包數(shù)據(jù)??蛻舳税l(fā)送一次心跳包到服務(wù)器,服務(wù)器更新該新客戶端狀態(tài)的。心跳包發(fā)送的過(guò)程,業(yè)務(wù)處理時(shí)長(zhǎng)等于IO讀?。≧ECV系統(tǒng)調(diào)用)加上業(yè)務(wù)處理(更新客戶狀態(tài))。吞吐量等于1s業(yè)務(wù)處理次數(shù)。
業(yè)務(wù)處理(更新客戶端狀態(tài))時(shí)間,業(yè)務(wù)不一樣的,處理時(shí)間不一樣,我們就不做討論。
那如何提升recv的性能。若只有一個(gè)客戶端,recv的性能也沒(méi)有必要提升,也不能提升。若在有百萬(wàn)計(jì)的客戶端長(zhǎng)連接的情況,我們?cè)撊绾翁嵘?。?a href="http://wenjunhu.com/v/tag/538/" target="_blank">Linux為例,在這里需要介紹一個(gè)“網(wǎng)紅”就是epoll。服務(wù)器使用epoll管理百萬(wàn)計(jì)的客戶端長(zhǎng)連接,代碼框架如下:
while (1) {
int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);
for (i = 0;i < nready;i ++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
int connfd = accept(listenfd, xxx, xxxx);
setnonblock(connfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
} else {
handle(sockfd);
}
}
}
對(duì)于響應(yīng)式服務(wù)器,所有的客戶端的操作驅(qū)動(dòng)都是來(lái)源于這個(gè)大循環(huán)。來(lái)源于epoll_wait的反饋結(jié)果。
對(duì)于服務(wù)器處理百萬(wàn)計(jì)的IO。Handle(sockfd)實(shí)現(xiàn)方式有兩種。
第一種,handle(sockfd)函數(shù)內(nèi)部對(duì)sockfd進(jìn)行讀寫(xiě)動(dòng)作。代碼如下
int handle(int sockfd) {
recv(sockfd, rbuffer, length, 0);
parser_proto(rbuffer, length);
send(sockfd, sbuffer, length, 0);
}
handle的io操作(send,recv)與epoll_wait是在同一個(gè)處理流程里面的。這就是IO同步操作。
優(yōu)點(diǎn):
- sockfd管理方便。
- 操作邏輯清晰。
缺點(diǎn):
- 服務(wù)器程序依賴epoll_wait的循環(huán)響應(yīng)速度慢。
- 程序性能差
第二種,handle(sockfd)函數(shù)內(nèi)部將sockfd的操作,push到線程池中,代碼如下:
int thread_cb(int sockfd) {
// 此函數(shù)是在線程池創(chuàng)建的線程中運(yùn)行。
// 與handle不在一個(gè)線程上下文中運(yùn)行
recv(sockfd, rbuffer, length, 0);
parser_proto(rbuffer, length);
send(sockfd, sbuffer, length, 0);
}
int handle(int sockfd) {
//此函數(shù)在主線程 main_thread 中運(yùn)行
//在此處之前,確保線程池已經(jīng)啟動(dòng)。
push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運(yùn)行。
}
Handle函數(shù)是將sockfd處理方式放到另一個(gè)已經(jīng)其他的線程中運(yùn)行,如此做法,將io操作(recv,send)與epoll_wait 不在一個(gè)處理流程里面,使得io操作(recv,send)與epoll_wait實(shí)現(xiàn)解耦。這就叫做IO異步操作。
優(yōu)點(diǎn):
- 子模塊好規(guī)劃。
- 程序性能高。
缺點(diǎn):
正因?yàn)樽幽K好規(guī)劃,使得模塊之間的sockfd的管理異常麻煩。每一個(gè)子線程都需要管理好sockfd,避免在IO操作的時(shí)候,sockfd出現(xiàn)關(guān)閉或其他異常。
上文有提到IO同步操作,程序響應(yīng)慢,IO異步操作,程序響應(yīng)快。
下面來(lái)對(duì)比一下IO同步操作與IO異步操作。
代碼如下:
https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c
在這份代碼的486行,#if 1, 打開(kāi)的時(shí)候,為IO異步操作。關(guān)閉的時(shí)候,為IO同步操作。
接下來(lái)把我測(cè)試接入量的結(jié)果粘貼出來(lái)。
IO異步操作,每1000個(gè)連接接入的服務(wù)器響應(yīng)時(shí)間(900ms左右)。
IO同步操作,每1000個(gè)連接接入的服務(wù)器響應(yīng)時(shí)間(6500ms左右)。
IO異步操作與IO同步操作
對(duì)比項(xiàng)
IO同步操作
IO異步操作
Sockfd管理
管理方便
多個(gè)線程共同管理
代碼邏輯
程序整體邏輯清晰
子模塊邏輯清晰
程序性能
響應(yīng)時(shí)間長(zhǎng),性能差
響應(yīng)時(shí)間短,性能好
有沒(méi)有一種方式,有異步性能,同步的代碼邏輯。來(lái)方便編程人員對(duì)IO操作的組件呢?有,采用一種輕量級(jí)的協(xié)程來(lái)實(shí)現(xiàn)。在每次send或者recv之前進(jìn)行切換,再由調(diào)度器來(lái)處理epoll_wait的流程。
就是采用了基于這樣的思考,寫(xiě)了NtyCo,實(shí)現(xiàn)了一個(gè)IO異步操作與協(xié)程結(jié)合的組件。https://https://github.com/wangbojing/NtyCo
第二章 協(xié)程的案例
問(wèn)題:協(xié)程如何使用?與線程使用有何區(qū)別?
在做網(wǎng)絡(luò)IO編程的時(shí)候,有一個(gè)非常理想的情況,就是每次accept返回的時(shí)候,就為新來(lái)的客戶端分配一個(gè)線程,這樣一個(gè)客戶端對(duì)應(yīng)一個(gè)線程。就不會(huì)有多個(gè)線程共用一個(gè)sockfd。每請(qǐng)求每線程的方式,并且代碼邏輯非常易讀。但是這只是理想,線程創(chuàng)建代價(jià),調(diào)度代價(jià)就呵呵了。
先來(lái)看一下每請(qǐng)求每線程的代碼如下:
while(1) {
socklen_t len = sizeof(struct sockaddr_in);
int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);
pthread_t thread_id;
pthread_create(&thread_id, NULL, client_cb, &clientfd);
}
這樣的做法,寫(xiě)完放到生產(chǎn)環(huán)境下面,如果你的老板不打死你,你來(lái)找我。我來(lái)幫你老板,為民除害。
如果我們有協(xié)程,我們就可以這樣實(shí)現(xiàn)。參考代碼如下:
https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c
while (1) {
socklen_t len = sizeof(struct sockaddr_in);
int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);
nty_coroutine *read_co;
nty_coroutine_create(&read_co, server_reader, &cli_fd);
}
這樣的代碼是完全可以放在生成環(huán)境下面的。如果你的老板要打死你,你來(lái)找我,我?guī)湍惆涯憷习宕蛩?,為民除害?/p>
線程的API思維來(lái)使用協(xié)程,函數(shù)調(diào)用的性能來(lái)測(cè)試協(xié)程。
NtyCo封裝出來(lái)了若干接口,一類是協(xié)程本身的,二類是posix的異步封裝
協(xié)程API:while
- 協(xié)程創(chuàng)建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
- 協(xié)程調(diào)度器的運(yùn)行
void nty_schedule_run(void)
POSIX異步封裝API:
int nty_socket(int domain, int type, int protocol)
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)
int nty_recv(int fd, void *buf, int length)
int nty_send(int fd, const void *buf, int length)
int nty_close(int fd)
接口格式與POSIX標(biāo)準(zhǔn)的函數(shù)定義一致。
第三章 協(xié)程的實(shí)現(xiàn)之工作流程
問(wèn)題:協(xié)程內(nèi)部是如何工作呢?
先來(lái)看一下協(xié)程服務(wù)器案例的代碼, 代碼參考:https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c
分別討論三個(gè)協(xié)程的比較晦澀的工作流程。第一個(gè)協(xié)程的創(chuàng)建;第二個(gè)IO異步操作;第三個(gè)協(xié)程子過(guò)程回調(diào)
3.1 創(chuàng)建協(xié)程
當(dāng)我們需要異步調(diào)用的時(shí)候,我們會(huì)創(chuàng)建一個(gè)協(xié)程。比如accept返回一個(gè)新的sockfd,創(chuàng)建一個(gè)客戶端處理的子過(guò)程。再比如需要監(jiān)聽(tīng)多個(gè)端口的時(shí)候,創(chuàng)建一個(gè)server的子過(guò)程,這樣多個(gè)端口同時(shí)工作的,是符合微服務(wù)的架構(gòu)的。
創(chuàng)建協(xié)程的時(shí)候,進(jìn)行了如何的工作?創(chuàng)建API如下:
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
參數(shù)1:nty_coroutine **new_co,需要傳入空的協(xié)程的對(duì)象,這個(gè)對(duì)象是由內(nèi)部創(chuàng)建的,并且在函數(shù)返回的時(shí)候,會(huì)返回一個(gè)內(nèi)部創(chuàng)建的協(xié)程對(duì)象。
參數(shù)2:proc_coroutine func,協(xié)程的子過(guò)程。當(dāng)協(xié)程被調(diào)度的時(shí)候,就會(huì)執(zhí)行該函數(shù)。
參數(shù)3:void *arg,需要傳入到新協(xié)程中的參數(shù)。
協(xié)程不存在親屬關(guān)系,都是一致的調(diào)度關(guān)系,接受調(diào)度器的調(diào)度。調(diào)用create API就會(huì)創(chuàng)建一個(gè)新協(xié)程,新協(xié)程就會(huì)加入到調(diào)度器的就緒隊(duì)列中。
創(chuàng)建的協(xié)程具體步驟會(huì)在《協(xié)程的實(shí)現(xiàn)之原語(yǔ)操作》來(lái)描述。
3.2 實(shí)現(xiàn)IO異步操作
大部分的朋友會(huì)關(guān)心IO異步操作如何實(shí)現(xiàn),在send與recv調(diào)用的時(shí)候,如何實(shí)現(xiàn)異步操作的。
先來(lái)看一下一段代碼:
while (1) {
int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);
for (i = 0;i < nready;i ++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
int connfd = accept(listenfd, xxx, xxxx);
setnonblock(connfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
} else {
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
recv(sockfd, buffer, length, 0);
//parser_proto(buffer, length);
send(sockfd, buffer, length, 0);
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
}
}
}
在進(jìn)行IO操作(recv,send)之前,先執(zhí)行了 epoll_ctl的del操作,將相應(yīng)的sockfd從epfd中刪除掉,在執(zhí)行完IO操作(recv,send)再進(jìn)行epoll_ctl的add的動(dòng)作。這段代碼看起來(lái)似乎好像沒(méi)有什么作用。
如果是在多個(gè)上下文中,這樣的做法就很有意義了。能夠保證sockfd只在一個(gè)上下文中能夠操作IO的。不會(huì)出現(xiàn)在多個(gè)上下文同時(shí)對(duì)一個(gè)IO進(jìn)行操作的。協(xié)程的IO異步操作正式是采用此模式進(jìn)行的。
把單一協(xié)程的工作與調(diào)度器的工作的劃分清楚,先引入兩個(gè)原語(yǔ)操作 resume,yield會(huì)在《協(xié)程的實(shí)現(xiàn)之原語(yǔ)操作》來(lái)講解協(xié)程所有原語(yǔ)操作的實(shí)現(xiàn),yield就是讓出運(yùn)行,resume就是恢復(fù)運(yùn)行。調(diào)度器與協(xié)程的上下文切換如下圖所示
在協(xié)程的上下文IO異步操作(nty_recv,nty_send)函數(shù),步驟如下:
- 將sockfd 添加到epoll管理中。
- 進(jìn)行上下文環(huán)境切換,由協(xié)程上下文yield到調(diào)度器的上下文。
- 調(diào)度器獲取下一個(gè)協(xié)程上下文。Resume新的協(xié)程
IO異步操作的上下文切換的時(shí)序圖如下:
3.3 回調(diào)協(xié)程的子過(guò)程
在create協(xié)程后,何時(shí)回調(diào)子過(guò)程?何種方式回調(diào)子過(guò)程?
首先來(lái)回顧一下x86_64寄存器的相關(guān)知識(shí)。匯編與寄存器相關(guān)知識(shí)還會(huì)在《協(xié)程的實(shí)現(xiàn)之切換》繼續(xù)深入探討的。x86_64 的寄存器有16個(gè)64位寄存器,分別是:%rax, %rbx,
%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。
%rax 作為函數(shù)返回值使用的。
%rsp 棧指針寄存器,指向棧頂
%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對(duì)應(yīng)第1參數(shù),第2參數(shù)。。。
%rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲(chǔ),遵循調(diào)用者使用規(guī)則,換句話說(shuō),就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改
%r10, %r11 用作數(shù)據(jù)存儲(chǔ),就是使用前要先保存原值
以NtyCo的實(shí)現(xiàn)為例,來(lái)分析這個(gè)過(guò)程。CPU有一個(gè)非常重要的寄存器叫做EIP,用來(lái)存儲(chǔ)CPU運(yùn)行下一條指令的地址。我們可以把回調(diào)函數(shù)的地址存儲(chǔ)到EIP中,將相應(yīng)的參數(shù)存儲(chǔ)到相應(yīng)的參數(shù)寄存器中。實(shí)現(xiàn)子過(guò)程調(diào)用的邏輯代碼如下:
void _exec(nty_coroutine *co) {
co- >func(co- >arg); //子過(guò)程的回調(diào)函數(shù)
}
void nty_coroutine_init(nty_coroutine *co) {
//ctx 就是協(xié)程的上下文
co- >ctx.edi = (void*)co; //設(shè)置參數(shù)
co- >ctx.eip = (void*)_exec; //設(shè)置回調(diào)函數(shù)入口
//當(dāng)實(shí)現(xiàn)上下文切換的時(shí)候,就會(huì)執(zhí)行入口函數(shù)_exec , _exec 調(diào)用子過(guò)程func
}
第四章 協(xié)程的實(shí)現(xiàn)之原語(yǔ)操作
問(wèn)題:協(xié)程的內(nèi)部原語(yǔ)操作有哪些?分別如何實(shí)現(xiàn)的?
協(xié)程的核心原語(yǔ)操作:create, resume, yield。協(xié)程的原語(yǔ)操作有create怎么沒(méi)有exit?以NtyCo為例,協(xié)程一旦創(chuàng)建就不能有用戶自己銷毀,必須得以子過(guò)程執(zhí)行結(jié)束,就會(huì)自動(dòng)銷毀協(xié)程的上下文數(shù)據(jù)。以_exec執(zhí)行入口函數(shù)返回而銷毀協(xié)程的上下文與相關(guān)信息。co->func(co->arg) 是子過(guò)程,若用戶需要長(zhǎng)久運(yùn)行協(xié)程,就必須要在func函數(shù)里面寫(xiě)入循環(huán)等操作。所以NtyCo里面沒(méi)有實(shí)現(xiàn)exit的原語(yǔ)操作。
create:創(chuàng)建一個(gè)協(xié)程。
- 調(diào)度器是否存在,不存在也創(chuàng)建。調(diào)度器作為全局的單例。將調(diào)度器的實(shí)例存儲(chǔ)在線程的私有空間pthread_setspecific。
- 分配一個(gè)coroutine的內(nèi)存空間,分別設(shè)置coroutine的數(shù)據(jù)項(xiàng),??臻g,棧大小,初始狀態(tài),創(chuàng)建時(shí)間,子過(guò)程回調(diào)函數(shù),子過(guò)程的調(diào)用參數(shù)。
- 將新分配協(xié)程添加到就緒隊(duì)列 ready_queue中
實(shí)現(xiàn)代碼如下:
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {
assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) {
nty_schedule_create(0);
sched = nty_coroutine_get_sched();
if (sched == NULL) {
printf("Failed to create schedulern");
return -1;
}
}
nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
if (co == NULL) {
printf("Failed to allocate memory for new coroutinen");
return -2;
}
//
int ret = posix_memalign(&co- >stack, getpagesize(), sched- >stack_size);
if (ret) {
printf("Failed to allocate stack for new coroutinen");
free(co);
return -3;
}
co- >sched = sched;
co- >stack_size = sched- >stack_size;
co- >status = BIT(NTY_COROUTINE_STATUS_NEW); //
co- >id = sched- >spawned_coroutines ++;
co- >func = func;
co- >fd = -1;
co- >events = 0;
co- >arg = arg;
co- >birth = nty_coroutine_usec_now();
*new_co = co;
TAILQ_INSERT_TAIL(&co- >sched- >ready, co, ready_next);
return 0;
}
yield:讓出CPU。
void nty_coroutine_yield(nty_coroutine *co)
參數(shù):當(dāng)前運(yùn)行的協(xié)程實(shí)例
調(diào)用后該函數(shù)不會(huì)立即返回,而是切換到最近執(zhí)行resume的上下文。該函數(shù)返回是在執(zhí)行resume的時(shí)候,會(huì)有調(diào)度器統(tǒng)一選擇resume的,然后再次調(diào)用yield的。resume與yield是兩個(gè)可逆過(guò)程的原子操作。
resume:恢復(fù)協(xié)程的運(yùn)行權(quán)
int nty_coroutine_resume(nty_coroutine *co)
參數(shù):需要恢復(fù)運(yùn)行的協(xié)程實(shí)例
調(diào)用后該函數(shù)也不會(huì)立即返回,而是切換到運(yùn)行協(xié)程實(shí)例的yield的位置。返回是在等協(xié)程相應(yīng)事務(wù)處理完成后,主動(dòng)yield會(huì)返回到resume的地方。
第五章 協(xié)程的實(shí)現(xiàn)之切換
問(wèn)題:協(xié)程的上下文如何切換?切換代碼如何實(shí)現(xiàn)?
首先來(lái)回顧一下x86_64寄存器的相關(guān)知識(shí)。x86_64 的寄存器有16個(gè)64位寄存器,分別是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,
%r13, %r14, %r15。
%rax 作為函數(shù)返回值使用的。
%rsp 棧指針寄存器,指向棧頂
%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對(duì)應(yīng)第1參數(shù),第2參數(shù)。。。
%rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲(chǔ),遵循調(diào)用者使用規(guī)則,換句話說(shuō),就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改
%r10, %r11 用作數(shù)據(jù)存儲(chǔ),就是使用前要先保存原值。
上下文切換,就是將CPU的寄存器暫時(shí)保存,再將即將運(yùn)行的協(xié)程的上下文寄存器,分別mov到相對(duì)應(yīng)的寄存器上。此時(shí)上下文完成切換。如下圖所示:
切換_switch函數(shù)定義:
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
參數(shù)1:即將運(yùn)行協(xié)程的上下文,寄存器列表
參數(shù)2:正在運(yùn)行協(xié)程的上下文,寄存器列表
我們nty_cpu_ctx結(jié)構(gòu)體的定義,為了兼容x86,結(jié)構(gòu)體項(xiàng)命令采用的是x86的寄存器名字命名。
typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
_switch返回后,執(zhí)行即將運(yùn)行協(xié)程的上下文。是實(shí)現(xiàn)上下文的切換
_switch的實(shí)現(xiàn)代碼:
0: __asm__ (
1: " .text n"
2: " .p2align 4,,15 n"
3: ".globl _switch n"
4: ".globl __switch n"
5: "_switch: n"
6: "__switch: n"
7: " movq %rsp, 0(%rsi) # save stack_pointer n"
8: " movq %rbp, 8(%rsi) # save frame_pointer n"
9: " movq (%rsp), %rax # save insn_pointer n"
10: " movq %rax, 16(%rsi) n"
11: " movq %rbx, 24(%rsi) # save rbx,r12-r15 n"
12: " movq %r12, 32(%rsi) n"
13: " movq %r13, 40(%rsi) n"
14: " movq %r14, 48(%rsi) n"
15: " movq %r15, 56(%rsi) n"
16: " movq 56(%rdi), %r15 n"
17: " movq 48(%rdi), %r14 n"
18: " movq 40(%rdi), %r13 # restore rbx,r12-r15 n"
19: " movq 32(%rdi), %r12 n"
20: " movq 24(%rdi), %rbx n"
21: " movq 8(%rdi), %rbp # restore frame_pointer n"
22: " movq 0(%rdi), %rsp # restore stack_pointer n"
23: " movq 16(%rdi), %rax # restore insn_pointer n"
24: " movq %rax, (%rsp) n"
25: " ret n"
26: );
按照x86_64的寄存器定義,%rdi保存第一個(gè)參數(shù)的值,即new_ctx的值,%rsi保存第二個(gè)參數(shù)的值,即保存cur_ctx的值。X86_64每個(gè)寄存器是64bit,8byte。
Movq %rsp, 0(%rsi) 保存在棧指針到cur_ctx實(shí)例的rsp項(xiàng)
Movq %rbp, 8(%rsi)
Movq (%rsp), %rax #將棧頂?shù)刂防锩娴闹荡鎯?chǔ)到rax寄存器中。Ret后出棧,執(zhí)行棧頂
Movq %rbp, 8(%rsi) #后續(xù)的指令都是用來(lái)保存CPU的寄存器到new_ctx的每一項(xiàng)中
Movq 8(%rdi), %rbp #將new_ctx的值
Movq 16(%rdi), %rax #將指令指針rip的值存儲(chǔ)到rax中
Movq %rax, (%rsp) # 將存儲(chǔ)的rip值的rax寄存器賦值給棧指針的地址的值。
Ret # 出棧,回到棧指針,執(zhí)行rip指向的指令。
上下文環(huán)境的切換完成。
第六章 協(xié)程的實(shí)現(xiàn)之定義
問(wèn)題:協(xié)程如何定義? 調(diào)度器如何定義?
先來(lái)一道設(shè)計(jì)題:
設(shè)計(jì)一個(gè)協(xié)程的運(yùn)行體R與運(yùn)行體調(diào)度器S的結(jié)構(gòu)體
- 運(yùn)行體R:包含運(yùn)行狀態(tài){就緒,睡眠,等待},運(yùn)行體回調(diào)函數(shù),回調(diào)參數(shù),棧指針,棧大小,當(dāng)前運(yùn)行體
- 調(diào)度器S:包含執(zhí)行集合{就緒,睡眠,等待}
這道設(shè)計(jì)題拆分兩個(gè)個(gè)問(wèn)題,一個(gè)運(yùn)行體如何高效地在多種狀態(tài)集合更換。調(diào)度器與運(yùn)行體的功能界限。
6.1 運(yùn)行體如何高效地在多種狀態(tài)集合更換
新創(chuàng)建的協(xié)程,創(chuàng)建完成后,加入到就緒集合,等待調(diào)度器的調(diào)度;協(xié)程在運(yùn)行完成后,進(jìn)行IO操作,此時(shí)IO并未準(zhǔn)備好,進(jìn)入等待狀態(tài)集合;IO準(zhǔn)備就緒,協(xié)程開(kāi)始運(yùn)行,后續(xù)進(jìn)行sleep操作,此時(shí)進(jìn)入到睡眠狀態(tài)集合。
就緒(ready),睡眠(sleep),等待(wait)集合該采用如何數(shù)據(jù)結(jié)構(gòu)來(lái)存儲(chǔ)?
就緒(ready)集合并不沒(méi)有設(shè)置優(yōu)先級(jí)的選型,所有在協(xié)程優(yōu)先級(jí)一致,所以可以使用隊(duì)列來(lái)存儲(chǔ)就緒的協(xié)程,簡(jiǎn)稱為就緒隊(duì)列(ready_queue)。
睡眠(sleep)集合需要按照睡眠時(shí)長(zhǎng)進(jìn)行排序,采用紅黑樹(shù)來(lái)存儲(chǔ),簡(jiǎn)稱睡眠樹(shù)(sleep_tree)紅黑樹(shù)在工程實(shí)用為, key為睡眠時(shí)長(zhǎng),value為對(duì)應(yīng)的協(xié)程結(jié)點(diǎn)。
等待(wait)集合,其功能是在等待IO準(zhǔn)備就緒,等待IO也是有時(shí)長(zhǎng)的,所以等待(wait)集合采用紅黑樹(shù)的來(lái)存儲(chǔ),簡(jiǎn)稱等待樹(shù)(wait_tree),此處借鑒nginx的設(shè)計(jì)。
數(shù)據(jù)結(jié)構(gòu)如下圖所示:
Coroutine就是協(xié)程的相應(yīng)屬性,status表示協(xié)程的運(yùn)行狀態(tài)。sleep與wait兩顆紅黑樹(shù),ready使用的隊(duì)列,比如某協(xié)程調(diào)用sleep函數(shù),加入睡眠樹(shù)(sleep_tree),status |= S即可。比如某協(xié)程在等待樹(shù)(wait_tree)中,而IO準(zhǔn)備就緒放入ready隊(duì)列中,只需要移出等待樹(shù)(wait_tree),狀態(tài)更改status &= ~W即可。有一個(gè)前提條件就是不管何種運(yùn)行狀態(tài)的協(xié)程,都在就緒隊(duì)列中,只是同時(shí)包含有其他的運(yùn)行狀態(tài)。
6.2 調(diào)度器與協(xié)程的功能界限
每一協(xié)程都需要使用的而且可能會(huì)不同屬性的,就是協(xié)程屬性。每一協(xié)程都需要的而且數(shù)據(jù)一致的,就是調(diào)度器的屬性。比如棧大小的數(shù)值,每個(gè)協(xié)程都一樣的后不做更改可以作為調(diào)度器的屬性,如果每個(gè)協(xié)程大小不一致,則可以作為協(xié)程的屬性。
用來(lái)管理所有協(xié)程的屬性,作為調(diào)度器的屬性。比如epoll用來(lái)管理每一個(gè)協(xié)程對(duì)應(yīng)的IO,是需要作為調(diào)度器屬性。
按照前面幾章的描述,定義一個(gè)協(xié)程結(jié)構(gòu)體需要多少域,我們描述了每一個(gè)協(xié)程有自己的上下文環(huán)境,需要保存CPU的寄存器ctx;需要有子過(guò)程的回調(diào)函數(shù)func;需要有子過(guò)程回調(diào)函數(shù)的參數(shù) arg;需要定義自己的??臻g stack;需要有自己棧空間的大小 stack_size;需要定義協(xié)程的創(chuàng)建時(shí)間 birth;需要定義協(xié)程當(dāng)前的運(yùn)行狀態(tài) status;需要定當(dāng)前運(yùn)行狀態(tài)的結(jié)點(diǎn)(ready_next, wait_node, sleep_node);需要定義協(xié)程id;需要定義調(diào)度器的全局對(duì)象 sched。
協(xié)程的核心結(jié)構(gòu)體如下:
typedef struct _nty_coroutine {
nty_cpu_ctx ctx;
proc_coroutine func;
void *arg;
size_t stack_size;
nty_coroutine_status status;
nty_schedule *sched;
uint64_t birth;
uint64_t id;
void *stack;
RB_ENTRY(_nty_coroutine) sleep_node;
RB_ENTRY(_nty_coroutine) wait_node;
TAILQ_ENTRY(_nty_coroutine) ready_next;
TAILQ_ENTRY(_nty_coroutine) defer_next;
} nty_coroutine;
調(diào)度器是管理所有協(xié)程運(yùn)行的組件,協(xié)程與調(diào)度器的運(yùn)行關(guān)系。
調(diào)度器的屬性,需要有保存CPU的寄存器上下文 ctx,可以從協(xié)程運(yùn)行狀態(tài)yield到調(diào)度器運(yùn)行的。從協(xié)程到調(diào)度器用yield,從調(diào)度器到協(xié)程用resume
以下為協(xié)程的定義。
typedef struct _nty_coroutine_queue nty_coroutine_queue;
typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
typedef struct _nty_schedule {
uint64_t birth;
nty_cpu_ctx ctx;
struct _nty_coroutine *curr_thread;
int page_size;
int poller_fd;
int eventfd;
struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
int nevents;
int num_new_events;
nty_coroutine_queue ready;
nty_coroutine_rbtree_sleep sleeping;
nty_coroutine_rbtree_wait waiting;
} nty_schedule;
第七章 協(xié)程的實(shí)現(xiàn)之調(diào)度器
問(wèn)題:協(xié)程如何被調(diào)度?
調(diào)度器的實(shí)現(xiàn),有兩種方案,一種是生產(chǎn)者消費(fèi)者模式,另一種多狀態(tài)運(yùn)行。
7.1 生產(chǎn)者消費(fèi)者模式
邏輯代碼如下:
while (1) {
//遍歷睡眠集合,將滿足條件的加入到ready
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) {
TAILQ_ADD(&sched- >ready, expired);
}
//遍歷等待集合,將滿足添加的加入到ready
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched- >epfd, events, EVENT_MAX, 1);
for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
TAILQ_ADD(&sched- >ready, wait);
}
// 使用resume回復(fù)ready的協(xié)程運(yùn)行權(quán)
while (!TAILQ_EMPTY(&sched- >ready)) {
nty_coroutine *ready = TAILQ_POP(sched- >ready);
resume(ready);
}
}
7.2 多狀態(tài)運(yùn)行
實(shí)現(xiàn)邏輯代碼如下:
while (1) {
//遍歷睡眠集合,使用resume恢復(fù)expired的協(xié)程運(yùn)行權(quán)
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) {
resume(expired);
}
//遍歷等待集合,使用resume恢復(fù)wait的協(xié)程運(yùn)行權(quán)
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched- >epfd, events, EVENT_MAX, 1);
for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
resume(wait);
}
// 使用resume恢復(fù)ready的協(xié)程運(yùn)行權(quán)
while (!TAILQ_EMPTY(sched- >ready)) {
nty_coroutine *ready = TAILQ_POP(sched- >ready);
resume(ready);
}
}
第八章 協(xié)程性能測(cè)試
測(cè)試環(huán)境:4臺(tái)VMWare 虛擬機(jī)
1臺(tái)服務(wù)器 6G內(nèi)存,4核CPU
3臺(tái)客戶端 2G內(nèi)存,2核CPU
操作系統(tǒng):ubuntu 14.04
服務(wù)器端測(cè)試代碼:https://https://github.com/wangbojing/NtyCotyCo
客戶端測(cè)試代碼:https://https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c1000k_test/blob/master/client_mutlport_epoll.c
按照每一個(gè)連接啟動(dòng)一個(gè)協(xié)程來(lái)測(cè)試。每一個(gè)協(xié)程??臻g 4096byte
6G內(nèi)存 –> 測(cè)試協(xié)程數(shù)量100W無(wú)異常。并且能夠正常收發(fā)數(shù)據(jù)。
-
程序
+關(guān)注
關(guān)注
117文章
3787瀏覽量
81049 -
代碼
+關(guān)注
關(guān)注
30文章
4788瀏覽量
68616 -
調(diào)度器
+關(guān)注
關(guān)注
0文章
98瀏覽量
5249
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論