開發(fā)環(huán)境:
RT-Thread版本:4.0.3
操作系統(tǒng):Windows10
Keil版本:V5.30
RT-Thread Studio版本:2.0.1
LWIP:2.0.2
3 Select/Poll概述
在LWIP中,如果要實(shí)現(xiàn)并發(fā)服務(wù)器,可以基于Sequentaial API來實(shí)現(xiàn),這種方式需要使用多線程,也就是為每個(gè)連接創(chuàng)建一個(gè)線程來處理數(shù)據(jù)。而在資源受限的嵌入式設(shè)備來說,如果為每個(gè)連接都創(chuàng)建一個(gè)線程,這種資源的消耗是巨大的,因此,我們需要換一種實(shí)現(xiàn)思路,也就是使用IO多路復(fù)用的機(jī)制來實(shí)現(xiàn),也就是select機(jī)制。
Select/Poll則是POSIX所規(guī)定,一般操作系統(tǒng)或協(xié)議棧均有實(shí)現(xiàn)。
值得注意的是,poll和select都是基于內(nèi)核函數(shù)sys_poll實(shí)現(xiàn)的,不同在于在Linux系統(tǒng)中select是從BSD Unix系統(tǒng)繼承而來,poll則是從System V Unix系統(tǒng)繼承而來,因此兩種方式相差不大。poll函數(shù)沒有最大文件描述符數(shù)量的限制。poll和 select與一樣,大量文件描述符的數(shù)組被整體復(fù)制于用戶和內(nèi)核的地址空間之間,開銷隨著文件描述符數(shù)量的增加而線性增大。
3.1 Select函數(shù)
在BSD Socket中,select函數(shù)原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
【參數(shù)說明】
- nfds:select監(jiān)視的文件句柄數(shù),一般設(shè)為要監(jiān)視各文件中的最大文件描述符值加1。
- readfds:文件描述符集合監(jiān)視文件集中的任何文件是否有數(shù)據(jù)可讀,當(dāng)select函數(shù)返回的時(shí)候,readfds將清除其中不可讀的文件描述符,只留下可讀的文件描述符。
- writefds:文件描述符集合監(jiān)視文件集中的任何文件是否有數(shù)據(jù)可寫,當(dāng)select函數(shù)返回的時(shí)候,writefds將清除其中不可寫的文件描述符,只留下可寫的文件描述符。
- exceptfds:文件集將監(jiān)視文件集中的任何文件是否發(fā)生錯(cuò)誤,可用于其他的用途,例如,監(jiān)視帶外數(shù)據(jù)OOB,帶外數(shù)據(jù)使用MSG_OOB標(biāo)志發(fā)送到套接字上。當(dāng)select函數(shù)返回的時(shí)候,exceptfds將清除其中的其他文件描述符,只留下標(biāo)記有OOB數(shù)據(jù)的文件描述符。
- timeout參數(shù)是一個(gè)指向 struct timeval類型的指針,它可以使 select()在等待 timeout時(shí)間后若沒有文件描述符準(zhǔn)備好則返回。其timeval結(jié)構(gòu)用于指定這段時(shí)間的秒數(shù)和微秒數(shù)。它可以使select處于三種狀態(tài):
(1)若將NULL以形參傳入,即不傳入時(shí)間結(jié)構(gòu),就是將select置于阻塞狀態(tài),一定等到監(jiān)視文件描述符集合中某個(gè)文件描述符發(fā)生變化為止;
(2)若將時(shí)間值設(shè)為0秒0毫秒,就變成一個(gè)純粹的非阻塞函數(shù),不管文件描述符是否有變化,都立刻返回繼續(xù)執(zhí)行,文件無變化返回0,有變化返回一個(gè)正值;
(3) timeout的值大于0,這就是等待的超時(shí)時(shí)間,即select在timeout時(shí)間內(nèi)阻塞,超時(shí)時(shí)間之內(nèi)有事件到來就返回了,否則在超時(shí)后不管怎樣一定返回,返回值同上述。
timeval結(jié)構(gòu)體定義
struct timeval
{
int tv_sec;/*秒 */
int tv_usec;/*微妙 */
};
【返回值】
- int:若有就緒描述符返回其數(shù)目,若超時(shí)則為0,若出錯(cuò)則為-1
下列操作用來設(shè)置、清除、判斷文件描述符集合。
FD_ZERO(fd_set *set);//清除一個(gè)文件描述符集。
FD_SET(int fd,fd_set *set);//將一個(gè)文件描述符加入文件描述符集中。
FD_CLR(int fd,fd_set *set);//將一個(gè)文件描述符從文件描述符集中清除。
FD_ISSET(int fd,fd_set *set);//判斷文件描述符是否被置位
fd_set可以理解為一個(gè)集合,這個(gè)集合中存放的是文件描述符(file descriptor),即文件句柄。中間的三個(gè)參數(shù)指定我們要讓內(nèi)核測(cè)試讀、寫和異常條件的文件描述符集合。如果對(duì)某一個(gè)的條件不感興趣,就可以把它設(shè)為空指針。
select()的機(jī)制中提供一種fd_set的數(shù)據(jù)結(jié)構(gòu),實(shí)際上是一個(gè)long類型的數(shù)組,每一個(gè)數(shù)組元素都能與打開的文件句柄(不管是Socket句柄,還是其他文件或命名管道或設(shè)備句柄)建立聯(lián)系,建立聯(lián)系的工作由程序員完成,當(dāng)調(diào)用select()時(shí),由內(nèi)核根據(jù)IO狀態(tài)修改fd_set的內(nèi)容,由此來通知執(zhí)行了select()的進(jìn)程哪一Socket或文件可讀。
3.2 Poll函數(shù)
poll的函數(shù)原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
【參數(shù)說明】
- fds:fds是一個(gè)struct pollfd類型的數(shù)組,用于存放需要檢測(cè)其狀態(tài)的socket描述符,并且調(diào)用poll函數(shù)之后fds數(shù)組不會(huì)被清空;一個(gè)pollfd結(jié)構(gòu)體表示一個(gè)被監(jiān)視的文件描述符,通過傳遞fds指示 poll()監(jiān)視多個(gè)文件描述符。
struct pollfd原型如下:
typedef struct pollfd {
int fd; //需要被檢測(cè)或選擇的文件描述符
short events; //對(duì)文件描述符fd上感興趣的事件
short revents; //文件描述符fd上當(dāng)前實(shí)際發(fā)生的事件
} pollfd_t;
其中,結(jié)構(gòu)體的events域是監(jiān)視該文件描述符的事件掩碼,由用戶來設(shè)置這個(gè)域,結(jié)構(gòu)體的revents域是文件描述符的操作結(jié)果事件掩碼,內(nèi)核在調(diào)用返回時(shí)設(shè)置這個(gè)域。
- nfds:記錄數(shù)組fds中描述符的總數(shù)量。
- timeout:指定等待的毫秒數(shù),無論 I/O是否準(zhǔn)備好,poll()都會(huì)返回,和select函數(shù)是類似的。
【返回值】
- int:函數(shù)返回fds集合中就緒的讀、寫,或出錯(cuò)的描述符數(shù)量,返回0表示超時(shí),返回-1表示出錯(cuò);
poll改變了文件描述符集合的描述方式,使用了pollfd結(jié)構(gòu)而不是select的fd_set結(jié)構(gòu),使得poll支持的文件描述符集合限制遠(yuǎn)大于select的1024。這也是和select不同的地方。
4 LWIP的select/poll實(shí)現(xiàn)
好了,接下來看看LWIP是如何實(shí)現(xiàn)select/poll的。
4.1 lwip_select實(shí)現(xiàn)
目前LWIP已經(jīng)完全實(shí)現(xiàn)select,它是基于信號(hào)量的機(jī)制來實(shí)現(xiàn)的,函數(shù)名是lwip_select。
LWIP實(shí)現(xiàn)Select的基本流程如下:
1.依次檢套接字集合中的每個(gè)套接字的事件表示,若有效,則記錄該套接字。
2.若存在一個(gè)或多事件,則返回,否則創(chuàng)建一個(gè)信號(hào)量并阻塞等待,記錄信號(hào)量的結(jié)構(gòu)體是select_cb_list,是一個(gè)鏈表,在[sockets.c]文件中定義的:
static struct lwip_select_cb *select_cb_list;//管理select的鏈表
lwip_select_cb原型如下:
/** Description for a task waiting in select */
struct lwip_select_cb {
/** Pointer to the next waiting task */
struct lwip_select_cb *next;
/** Pointer to the previous waiting task */
struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT
/** readset passed to select */
fd_set *readset;
/** writeset passed to select */
fd_set *writeset;
/** unimplemented: exceptset passed to select */
fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL
/** fds passed to poll; NULL if select */
struct pollfd *poll_fds;
/** nfds passed to poll; 0 if select */
nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL */
/** don't signal the same semaphore twice: set to 1 when signalled */
int sem_signalled;//是否釋放信號(hào)領(lǐng)
/** semaphore to wake up a task waiting for select */
SELECT_SEM_T sem;//select阻塞的信號(hào)量
};
3.當(dāng)套接字集合初始化,會(huì)向netconn結(jié)構(gòu)注冊(cè)回調(diào)函數(shù)event_callback,當(dāng)有是事件發(fā)生時(shí),回調(diào)函數(shù)就被被執(zhí)行,而且回調(diào)函數(shù)會(huì)遍歷select_cb_list,如果套接字在select_cb_list中,則select_cb_list釋放一個(gè)信號(hào)量。
好了,接下來看看LWIP的select具體實(shí)現(xiàn),其原型如下:
int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
struct timeval *timeout)
{
u32_t waitres = 0;//記錄select等待時(shí)間
int nready;
fd_set lreadset, lwriteset, lexceptset;//記錄發(fā)生事件的套接字
u32_t msectimeout;
int i;
int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEX
fd_set used_sockets;
#endif
SYS_ARCH_DECL_PROTECT(lev);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\n",
maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,
timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));
if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {
set_errno(EINVAL);
return -1;
}
lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);
/* Go through each socket in each list to count number of sockets which
currently match */
//檢測(cè)套接字集合中是否發(fā)生事件
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
if (nready < 0) {
/* one of the sockets in one of the fd_sets was invalid */
set_errno(EBADF);
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
return -1;
} else if (nready > 0) {
/* one or more sockets are set, no need to wait */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
} else {
/* If we don't have any current events, then suspend if we are supposed to */
if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));
/* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
} else {
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables (unless we're running in MPU compatible
mode). */
API_SELECT_CB_VAR_DECLARE(select_cb);
API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);
memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));
API_SELECT_CB_VAR_REF(select_cb).readset = readset;
API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(ENOMEM);
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
API_SELECT_CB_VAR_FREE(select_cb);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
/* Increase select_waiting for each socket we are interested in */
maxfdp2 = maxfdp1;
for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock;
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn_locked(i);
if (sock != NULL) {
sock->select_waiting++;//讀寫異常通知,并且socket是存在的,則會(huì)將select_wainting增加1
if (sock->select_waiting == 0) {
/* overflow - too many threads waiting */
sock->select_waiting--;
nready = -1;
maxfdp2 = i;
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
set_errno(EBUSY);
break;
}
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
} else {
/* Not a valid socket */
nready = -1;
maxfdp2 = i;
SYS_ARCH_UNPROTECT(lev);
set_errno(EBADF);
break;
}
}
}
if (nready >= 0) {
/* Call lwip_selscan again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
//執(zhí)行完上述操作,再次掃描一次是否有socket有事件產(chǎn)生
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout == 0) {
/* Wait forever */
msectimeout = 0;
} else {
long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));
if (msecs_long <= 0) {
/* Wait 1ms at least (0 means wait forever) */
msectimeout = 1;
} else {
msectimeout = (u32_t)msecs_long;
}
}
? //休眠指定時(shí)間,讓出cpu控制權(quán)
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
}
?
/* Decrease select_waiting for each socket we are interested in */
for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock;
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn_locked(i);
if (sock != NULL) {
/* for now, handle select_waiting==0... */
LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
if (sock->select_waiting > 0) {
sock->select_waiting--;//休眠結(jié)束,將對(duì)應(yīng)socket->select_waiting減1
}
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
} else {
SYS_ARCH_UNPROTECT(lev);
/* Not a valid socket */
nready = -1;
set_errno(EBADF);
}
}
}
lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
#if LWIP_NETCONN_SEM_PER_THREAD
if (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
API_SELECT_CB_VAR_FREE(select_cb);
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
/* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));
/* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
} else {
/* See what's set now after waiting */
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
}
}
}
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
set_errno(0);
if (readset) {
*readset = lreadset;
}
if (writeset) {
*writeset = lwriteset;
}
if (exceptset) {
*exceptset = lexceptset;
}
return nready;
}
以上代碼最核心的就是socket->select_waiting加1和減1的地方,當(dāng)socket存在且的確需要監(jiān)聽事件,且并不是進(jìn)來事件就已經(jīng)產(chǎn)生或者已經(jīng)超時(shí),一定會(huì)加1;然后線程會(huì)有可能會(huì)進(jìn)行休眠;正常情況下,休眠結(jié)束后,socket->select_waiting減1,離開該函數(shù),socket->select_waiting恢復(fù)原值。但是,如果在休眠期間進(jìn)行了close(socket),則通過try_socket(socket)獲取不到socket結(jié)構(gòu)體,則socket->select_waiting不會(huì)進(jìn)行減1,后面執(zhí)行一系列語句后,退出該函數(shù),socket->select_waiting沒有恢復(fù)原值,且比進(jìn)來時(shí)大1。針對(duì)該函數(shù),socket->select_waiting加1的次數(shù)是>=減1的次數(shù),所以如果只要在函數(shù)退出時(shí)沒有恢復(fù)原值,則socket->select_waiting永遠(yuǎn)不可能再減為0了,此時(shí)socket資源就出現(xiàn)了假占用,該socket再也不能被其他人使用了。
lwip_select函數(shù)實(shí)現(xiàn)的具體流程如下:
Select的實(shí)現(xiàn)有個(gè)重要的結(jié)構(gòu)體lwip_sock,其原型如下:
/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
/** sockets currently are built on netconns, each socket has one netconn */
struct netconn *conn;
/** data that was left from the previous read */
union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL
/** number of times data was received, set by event_callback(),
tested by the receive and select functions */
s16_t rcvevent;
/** number of times data was ACKed (free send buffer), set by event_callback(),
tested by select */
u16_t sendevent;
/** error happened for this socket, set by event_callback(), tested by select */
u16_t errevent;
/** counter of how many threads are waiting for this socket using select */
SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX
/* counter of how many threads are using a struct lwip_sock (not the 'int') */
u8_t fd_used;
/* status of pending close/delete actions */
u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP 1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif
#ifdef SAL_USING_POSIX
rt_wqueue_t wait_head;
#endif
};
在socket數(shù)據(jù)接收時(shí),lwip_sock利用netconn相關(guān)的接收函數(shù)獲得一個(gè)pbuf(對(duì)于TCP)或者一個(gè)netbuf(對(duì)于UDP)數(shù)據(jù),而這二者封裝的數(shù)據(jù)可能大于socket用戶指定的數(shù)據(jù)接收長(zhǎng)度,因此在這種情況下,這兩個(gè)數(shù)據(jù)包需要暫時(shí)保存在socket中,以待用戶下一次讀取,這里lastdata就用于指向未被用戶完全讀取的數(shù)據(jù)包,而lastoffset則指向了未讀取的數(shù)據(jù)在數(shù)據(jù)包中的偏移。lwip_sock最后的五個(gè)字段是為select機(jī)制實(shí)現(xiàn)時(shí)使用的。
lwip_socket是上層Socket API中的實(shí)現(xiàn),它對(duì)netconn結(jié)構(gòu)的封裝和增強(qiáng),描述一個(gè)具體連接。它基于內(nèi)核netconn來實(shí)現(xiàn)所有邏輯,conn指向了與socket對(duì)應(yīng)的netconn結(jié)構(gòu)。Netconn原型如下:
/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);
/** A netconn descriptor */
struct netconn {
/** type of the netconn (TCP, UDP or RAW) */
enum netconn_type type;
/** current state of the netconn */
enum netconn_state state;
/** the lwIP internal protocol control block */
union {
struct ip_pcb *ip;
struct tcp_pcb *tcp;
struct udp_pcb *udp;
struct raw_pcb *raw;
} pcb;
/** the last asynchronous unreported error this netconn had */
err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD
/** sem that is used to synchronously execute functions in the core context */
sys_sem_t op_completed;
#endif
/** mbox where received packets are stored until they are fetched
by the netconn application thread (can grow quite big) */
sys_mbox_t recvmbox;
#if LWIP_TCP
/** mbox where new connections are stored until processed
by the application thread */
sys_mbox_t acceptmbox;
#endif /* LWIP_TCP */
#if LWIP_NETCONN_FULLDUPLEX
/** number of threads waiting on an mbox. This is required to unblock
all threads when closing while threads are waiting. */
int mbox_threads_waiting;
#endif
/** only used for socket layer */
#if LWIP_SOCKET
int socket;
#endif /* LWIP_SOCKET */
#if LWIP_SO_SNDTIMEO
/** timeout to wait for sending data (which means enqueueing data for sending
in internal buffers) in milliseconds */
s32_t send_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVTIMEO
/** timeout in milliseconds to wait for new data to be received
(or connections to arrive for listening netconns) */
u32_t recv_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVBUF
/** maximum amount of bytes queued in recvmbox
not used for TCP: adjust TCP_WND instead! */
int recv_bufsize;
/** number of bytes currently in recvmbox to be received,
tested against recv_bufsize to limit bytes on recvmbox
for UDP and RAW, used for FIONREAD */
int recv_avail;
#endif /* LWIP_SO_RCVBUF */
#if LWIP_SO_LINGER
/** values <0 mean linger is disabled, values > 0 are seconds to linger */
s16_t linger;
#endif /* LWIP_SO_LINGER */
/** flags holding more netconn-internal state, see NETCONN_FLAG_* defines */
u8_t flags;
#if LWIP_TCP
/** TCP: when data passed to netconn_write doesn't fit into the send buffer,
this temporarily stores the message.
Also used during connect and close. */
struct api_msg *current_msg;
#endif /* LWIP_TCP */
/** A callback function that is informed about events for this netconn */
netconn_callback callback;
};
前文已經(jīng)提到,套接字集合初始化時(shí),會(huì)向netconn結(jié)構(gòu)注冊(cè)回調(diào)函數(shù)event_callback,這個(gè)回調(diào)函數(shù)就是結(jié)構(gòu)體netconn中netconn_callback,接下來看看netconn_callback函數(shù)原型:
/**
* Callback registered in the netconn layer for each socket-netconn.
* Processes recvevent (data available) and wakes up tasks waiting for select.
*
* @note for LWIP_TCPIP_CORE_LOCKING any caller of this function
* must have the core lock held when signaling the following events
* as they might cause select_list_cb to be checked:
* NETCONN_EVT_RCVPLUS數(shù)據(jù)被內(nèi)核接收則會(huì)產(chǎn)生該事件
* NETCONN_EVT_SENDPLUS數(shù)據(jù)成功發(fā)送則產(chǎn)生該事件
* NETCONN_EVT_ERROR連接錯(cuò)誤則產(chǎn)生該事件
* This requirement will be asserted in select_check_waiters()
*/
static void
event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
{
int s, check_waiters;
struct lwip_sock *sock;
SYS_ARCH_DECL_PROTECT(lev);
LWIP_UNUSED_ARG(len);
/* Get socket */
if (conn) {
s = conn->socket;
if (s < 0) {
/* Data comes in right away after an accept, even though
* the server task might not have created a new socket yet.
? * Just count down (or up) if that's the case and we
* will use the data later. Note that only receive events
* can happen before the new socket is set up. */
SYS_ARCH_PROTECT(lev);
if (conn->socket < 0) {
if (evt == NETCONN_EVT_RCVPLUS) {
/* conn->socket is -1 on initialization
lwip_accept adjusts sock->recvevent if conn->socket < -1 */
conn->socket--;
}
SYS_ARCH_UNPROTECT(lev);
return;
}
s = conn->socket;
SYS_ARCH_UNPROTECT(lev);
}
sock = get_socket(s);//獲取socket對(duì)應(yīng)的結(jié)構(gòu)
if (!sock) {
return;
}
} else {
return;
}
check_waiters = 1;
//進(jìn)入臨界區(qū),根據(jù)事件來更新socket的event值
SYS_ARCH_PROTECT(lev);
/* Set event as required */
switch (evt) {
case NETCONN_EVT_RCVPLUS://數(shù)據(jù)被內(nèi)核收到
sock->rcvevent++;
if (sock->rcvevent > 1) {
check_waiters = 0;
}
break;
case NETCONN_EVT_RCVMINUS://數(shù)據(jù)被用戶讀取
sock->rcvevent--;
check_waiters = 0;
break;
case NETCONN_EVT_SENDPLUS://輸出發(fā)送成功
if (sock->sendevent) {
check_waiters = 0;
}
sock->sendevent = 1;
break;
case NETCONN_EVT_SENDMINUS://用戶寫入數(shù)據(jù)到緩沖區(qū)
sock->sendevent = 0;
check_waiters = 0;
break;
case NETCONN_EVT_ERROR://連接錯(cuò)誤
sock->errevent = 1;
break;
default:
LWIP_ASSERT("unknown event", 0);
break;
}
//事件設(shè)置完畢,喚醒阻塞的select函數(shù)
if (sock->select_waiting && check_waiters) {
/* Save which events are active */
int has_recvevent, has_sendevent, has_errevent;
has_recvevent = sock->rcvevent > 0;//數(shù)據(jù)可讀事件
has_sendevent = sock->sendevent != 0;//數(shù)據(jù)可寫事件
has_errevent = sock->errevent != 0;//數(shù)據(jù)異常事件
SYS_ARCH_UNPROTECT(lev);
/* Check any select calls waiting on this socket */
select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);
} else {
SYS_ARCH_UNPROTECT(lev);
}
done_socket(sock);
}
綜上,event_callback的本質(zhì)就是readset、writeset、exceptset集合的監(jiān)聽,并對(duì)rcvevent、sendevent、errevent的填寫,并阻塞的lwip_select函數(shù)發(fā)送信號(hào)量。而lwip_select的本質(zhì)就是對(duì)rcvevent、sendevent、errevent的讀取,并執(zhí)行相應(yīng)的操作,lwip_select主要是通過lwip_selscan來掃描事件的。
4.2 lwip_poll實(shí)現(xiàn)
LWIP也完全實(shí)現(xiàn)poll,函數(shù)名是lwip_poll。lwip_poll和lwip_select的實(shí)現(xiàn)機(jī)制差不多,只是lwip_poll使用pollfd的結(jié)構(gòu)來存儲(chǔ)描述符的,它是基于鏈表來存儲(chǔ)的,這樣lwip_poll函數(shù)沒有最大文件描述符數(shù)量的限制。lwip_poll函數(shù)原型如下:
int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
u32_t waitres = 0;
int nready;
u32_t msectimeout;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",
(void*)fds, (int)nfds, timeout));
LWIP_ERROR("lwip_poll: invalid fds", ((fds != NULL && nfds > 0) || (fds == NULL && nfds == 0)),
set_errno(EINVAL); return -1;);
lwip_poll_inc_sockets_used(fds, nfds);
/* Go through each struct pollfd to count number of structures
which currently match */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);
if (nready < 0) {
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
/* If we don't have any current events, then suspend if we are supposed to */
if (!nready) {
API_SELECT_CB_VAR_DECLARE(select_cb);
if (timeout == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));
goto return_success;
}
API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(EAGAIN); lwip_poll_dec_sockets_used(fds, nfds); return -1);
memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables. */
API_SELECT_CB_VAR_REF(select_cb).poll_fds = fds;
API_SELECT_CB_VAR_REF(select_cb).poll_nfds = nfds;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(EAGAIN);
lwip_poll_dec_sockets_used(fds, nfds);
API_SELECT_CB_VAR_FREE(select_cb);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
/* Increase select_waiting for each socket we are interested in.
Also, check for events again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout < 0) {
/* Wait forever */
msectimeout = 0;
} else {
/* timeout == 0 would have been handled earlier. */
LWIP_ASSERT("timeout > 0", timeout > 0);
msectimeout = timeout;
}
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
/* Decrease select_waiting for each socket we are interested in,
and check which events occurred while we waited. */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);
lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
#if LWIP_NETCONN_SEM_PER_THREAD
if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
API_SELECT_CB_VAR_FREE(select_cb);
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
? /* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));
goto return_success;
}
}
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:
lwip_poll_dec_sockets_used(fds, nfds);
set_errno(0);
return nready;
}
和lwip_select一樣也是對(duì)事件進(jìn)行掃描,只是掃描函數(shù)是lwip_pollscan而已。后面的內(nèi)容就不在分析,有興趣請(qǐng)參看LWIP源碼。
lwip_poll函數(shù)實(shí)現(xiàn)的具體流程如下:
5并發(fā)服務(wù)器實(shí)現(xiàn)
前文講解了select/poll機(jī)制在LWIP的實(shí)現(xiàn),接下來將使用select/poll來實(shí)現(xiàn)并發(fā)服務(wù)器。這里以select為例。
select并發(fā)服務(wù)器模型:
socket(...); //創(chuàng)建套接字
bind(...); //綁定
listen(...); //監(jiān)聽
while(1)
{
if(select(...) > 0) //檢測(cè)監(jiān)聽套接字是否可讀
{
if(FD_ISSET(...)>0) //套接字可讀,證明有新客戶端連接服務(wù)器
{
accpet(...);//取出已經(jīng)完成的連接
process(...);//處理請(qǐng)求,反饋結(jié)果
}
}
close(...); //關(guān)閉連接套接字:accept()返回的套接字
}
因此,基于select實(shí)現(xiàn)的并發(fā)服務(wù)器模型如下:
從流程上來看,使用select函數(shù)進(jìn)行IO請(qǐng)求和同步阻塞模型沒有太大的區(qū)別,甚至還多了添加監(jiān)視socket,以及調(diào)用select函數(shù)的額外操作,效率更差。但是,使用select以后最大的優(yōu)勢(shì)是用戶可以在一個(gè)線程內(nèi)同時(shí)處理多個(gè)socket的IO請(qǐng)求。用戶可以注冊(cè)多個(gè)socket,然后不斷地調(diào)用select讀取被激活的socket,即可達(dá)到在同一個(gè)線程內(nèi)同時(shí)處理多個(gè)IO請(qǐng)求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達(dá)到這個(gè)目的。
Server:
/**
******************************************************************************
* @file server.c
* @author BruceOu
* @rtt version V4.0.3
* @version V1.0
* @date 2022-06-08
* @blog https://blog.bruceou.cn/
* @Official Accounts 嵌入式實(shí)驗(yàn)樓
* @brief 基于select的服務(wù)器
******************************************************************************
*/
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8888
#define BUFF_SIZE 1024
static char recvbuff[BUFF_SIZE];
static void net_server_thread_entry(void *parameter)
{
int sfd, cfd, maxfd, i, nready, n;
struct sockaddr_in server_addr, client_addr;
struct netdev *netdev = RT_NULL;
char sendbuff[] = "Hello client!";
socklen_t client_addr_len;
fd_set all_set, read_set;
//FD_SETSIZE里面包含了服務(wù)器的fd
int clientfds[FD_SETSIZE - 1];
/*通過名稱獲取 netdev網(wǎng)卡對(duì)象 */
netdev = netdev_get_by_name((char*)parameter);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s) failed.\n", (char*)parameter);
}
//創(chuàng)建socket
if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
rt_kprintf("Socket create failed.\n");
}
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
//server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/*?獲取網(wǎng)卡對(duì)象中 IP?地址信息 */
server_addr.sin_addr.s_addr = netdev->ip_addr.addr;
//綁定socket
if (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.\n");
closesocket(sfd);
}
rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);
//監(jiān)聽socket
if(listen(sfd, 5) == -1)
{
rt_kprintf("listen error");
}
else
{
rt_kprintf("listening...\n");
}
client_addr_len = sizeof(client_addr);
//初始化 maxfd等于 sfd
maxfd = sfd;
//清空fdset
FD_ZERO(&all_set);
//把sfd文件描述符添加到集合中
FD_SET(sfd, &all_set);
//初始化客戶端fd的集合
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
//初始化為-1
clientfds[i] = -1;
}
while(1)
{
//每次select返回之后,fd_set集合就會(huì)變化,再select時(shí),就不能使用,
//所以我們要保存設(shè)置fd_set?和?讀取的fd_set
read_set = all_set;
nready = select(maxfd + 1, &read_set, NULL, NULL, NULL);
//沒有超時(shí)機(jī)制,不會(huì)返回0
if(nready < 0)
{
? rt_kprintf("select error \r\n");
}
//判斷監(jiān)聽的套接字是否有數(shù)據(jù)
if(FD_ISSET(sfd, &read_set))
{
//有客戶端進(jìn)行連接了
cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);
if(cfd < 0)
{
rt_kprintf("accept socket error\r\n");
//繼續(xù)select
continue;
}
rt_kprintf("new client connect fd = %d\r\n", cfd);
//把新的cfd?添加到fd_set集合中
FD_SET(cfd, &all_set);
//更新要select的maxfd
maxfd = (cfd > maxfd)?cfd:maxfd;
//把新的cfd保存到cfds集合中
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds[i] == -1)
{
clientfds[i] = cfd;
//退出,不需要添加
break;
}
}
//沒有其他套接字需要處理:這里防止重復(fù)工作,就不去執(zhí)行其他任務(wù)
if(--nready == 0)
{
//繼續(xù)select
continue;
}
}
//遍歷所有的客戶端文件描述符
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds[i] == -1)
{
//繼續(xù)遍歷
continue;
}
//判斷是否在fd_set集合里面
? if(FD_ISSET(clientfds[i], &read_set))
{
n = recv(clientfds[i], recvbuff, sizeof(recvbuff), 0);
rt_kprintf("clientfd %d:? %s \r\n",clientfds[i], recvbuff);
if(n <= 0)
{
//從集合里面清除
FD_CLR(clientfds[i], &all_set);
//當(dāng)前的客戶端fd?賦值為-1
clientfds[i] = -1;? }
else
{
//寫回客戶端
n = send(clientfds[i], sendbuff, strlen(sendbuff), 0);
if(n < 0)
{
//從集合里面清除
FD_CLR(clientfds[i], &all_set);
//當(dāng)前的客戶端fd?賦值為-1
? clientfds[i] = -1;
}
}
}
}
}
}
static int server(int argc, char **argv)
{
rt_err_t ret = RT_EOK;
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name]? --bind network interface device by name.\n");
return -RT_ERROR;
}
/*?創(chuàng)建 serial?線程 */
rt_thread_t thread = rt_thread_create("server",
net_server_thread_entry,
argv[1],
4096,
10,
10);
/*?創(chuàng)建成功則啟動(dòng)線程 */
if (thread != RT_NULL)
{
rt_thread_startup(thread);
}
else
{
? ret = RT_ERROR;
}
return ret;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(server, network interface device test);
#endif /* FINSH_USING_MSH */
Client:【Linux版】
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SERVPORT 8888
int main(int argc,char *argv[])
{
char sendbuf[] = "Client1 : Hello Rtthread!";
char recvbuf[2014];
int sockfd,sendbytes;
struct sockaddr_in serv_addr;//需要連接的服務(wù)器地址信息
if (argc != 2)
{
perror("init error");
}
//1.創(chuàng)建socket
//AF_INET表示IPV4
//SOCK_STREAM表示TCP
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
{
perror("socket");
exit(1);
}
//填充服務(wù)器地址信息
serv_addr.sin_family ???? = AF_INET; //網(wǎng)絡(luò)層的IP協(xié)議: IPV4
serv_addr.sin_port ??????? = htons(SERVPORT); //傳輸層的端口號(hào)
serv_addr.sin_addr.s_addr? = inet_addr(argv[1]); //網(wǎng)絡(luò)層的IP地址:?實(shí)際的服務(wù)器IP地址
?bzero(&(serv_addr.sin_zero),8); //保留的8字節(jié)置零
//2.發(fā)起對(duì)服務(wù)器的連接信息
//三次握手,需要將sockaddr_in類型的數(shù)據(jù)結(jié)構(gòu)強(qiáng)制轉(zhuǎn)換為sockaddr
if((connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(struct sockaddr))) < 0) {
perror("connect failed!");
exit(1);
}
printf("connect successful! \n");
//3.發(fā)送消息給服務(wù)器端
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf), 0);
recv(sockfd, recvbuf, sizeof(recvbuf), 0);
????? printf("Server : %s \n", recvbuf);
????? sleep(2);
}
//4.關(guān)閉
close(sockfd);
}
Client:【RT-Thread版】
/**
******************************************************************************
* @file client.c
* @author BruceOu
* @rtt version V4.0.3
* @version V1.0
* @date 2022-06-01
* @blog https://blog.bruceou.cn/
* @Official Accounts 嵌入式實(shí)驗(yàn)樓
* @brief 客戶端
******************************************************************************
*/
#include
#include
#include
#include
#include
#include
#define SERVER_HOST "192.168.101.8"
#define SERVER_PORT 8888
static int client(int argc, char **argv)
{
struct sockaddr_in client_addr;
struct sockaddr_in server_addr;
struct netdev *netdev = RT_NULL;
int sockfd = -1;
char sendbuf[] = "Hello RT-Thread! \r\n";
char recvbuf[2014];
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name] --bind network interface device by name.\n");
return -RT_ERROR;
}
/*通過名稱獲取 netdev網(wǎng)卡對(duì)象 */
netdev = netdev_get_by_name(argv[1]);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s) failed.\n", argv[1]);
return -RT_ERROR;
}
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
rt_kprintf("Socket create failed.\n");
return -RT_ERROR;
}
/*?初始化需要綁定的客戶端地址 */
client_addr.sin_family = AF_INET;
client_addr.sin_port = htons(8080);
/*?獲取網(wǎng)卡對(duì)象中 IP?地址信息 */
client_addr.sin_addr.s_addr = netdev->ip_addr.addr;
rt_memset(&(client_addr.sin_zero), 0, sizeof(client_addr.sin_zero));
if (bind(sockfd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.\n");
closesocket(sockfd);
return -RT_ERROR;
}
rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);
/*初始化預(yù)連接的服務(wù)端地址 */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
rt_memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));
/*連接到服務(wù)端 */
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket connect failed!\n");
closesocket(sockfd);
return -RT_ERROR;
}
else
{
rt_kprintf("socket connect success!\n");
}
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf), 0);
recv(sockfd, recvbuf, sizeof(recvbuf), 0);
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
rt_thread_mdelay(500);
}
/*?關(guān)閉連接 */
closesocket(sockfd);
return RT_EOK;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(client, network interface device test);
#endif /* FINSH_USING_MSH */
接下來就是驗(yàn)證了,關(guān)于ART-Pi的聯(lián)網(wǎng)部分就不再贅述了有不懂的看前面的章節(jié)。
現(xiàn)在ART-Pi上開啟服務(wù)器:
Server:
然后開啟客戶端,筆者的客戶端在Ubuntu上運(yùn)行的:
Client:
筆者這里使用的客戶端只有兩個(gè),有興趣的也可以使用多個(gè)客戶端。
當(dāng)然啦,如果懶得寫客戶端,也可使用網(wǎng)絡(luò)調(diào)試助手測(cè)試。
-
服務(wù)器
+關(guān)注
關(guān)注
12文章
9160瀏覽量
85421 -
API
+關(guān)注
關(guān)注
2文章
1501瀏覽量
62017 -
RT-Thread
+關(guān)注
關(guān)注
31文章
1289瀏覽量
40130 -
select
+關(guān)注
關(guān)注
0文章
28瀏覽量
3921 -
ART-Pi
+關(guān)注
關(guān)注
0文章
23瀏覽量
1302
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論