0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

了解連接池、線程池、內(nèi)存池、異步請(qǐng)求池

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-09 14:44 ? 次閱讀

池化技術(shù)

池化技術(shù)能夠減少資源對(duì)象的創(chuàng)建次數(shù),提?程序的響應(yīng)性能,特別是在?并發(fā)下這種提?更加明顯。使用池化技術(shù)緩存的資源對(duì)象有如下共同特點(diǎn):

  1. 對(duì)象創(chuàng)建時(shí)間長(zhǎng);
  2. 對(duì)象創(chuàng)建需要大量資源;
  3. 對(duì)象創(chuàng)建后可被重復(fù)使用像常見的線程池、內(nèi)存池、連接池、對(duì)象池都具有以上的共同特點(diǎn)。

連接池

什么是數(shù)據(jù)庫(kù)連接池

定義:數(shù)據(jù)庫(kù)連接池(Connection pooling)是程序啟動(dòng)時(shí)建立足夠的數(shù)據(jù)庫(kù)連接,并將這些連接組成一個(gè)連接池,由程序動(dòng)態(tài)地對(duì)池中的連接進(jìn)行申請(qǐng),使用,釋放。

大白話:創(chuàng)建數(shù)據(jù)庫(kù)連接是?個(gè)很耗時(shí)的操作,也容易對(duì)數(shù)據(jù)庫(kù)造成安全隱患。所以,在程序初始化的時(shí)候,集中創(chuàng)建多個(gè)數(shù)據(jù)庫(kù)連接,并把他們集中管理,供程序使用,可以保證較快的數(shù)據(jù)庫(kù)讀寫速度,還更加安全可靠。這里講的數(shù)據(jù)庫(kù),不單只是指Mysql,也同樣適用于Redis。

為什么使用數(shù)據(jù)庫(kù)連接池

  1. 資源復(fù)用:由于數(shù)據(jù)庫(kù)連接得到復(fù)用,避免了頻繁的創(chuàng)建、釋放連接引起的性能開銷,在減少系統(tǒng)消耗的基礎(chǔ)上,另一方面也增進(jìn)了系統(tǒng)運(yùn)行環(huán)境的平穩(wěn)性(減少內(nèi)存碎片以及數(shù)據(jù)庫(kù)臨時(shí)進(jìn)程/線程的數(shù)量)。
  2. 更快的系統(tǒng)響應(yīng)速度:數(shù)據(jù)庫(kù)連接池在初始化過程中,往往已經(jīng)創(chuàng)建了若干數(shù)據(jù)庫(kù)連接置于池中備用。此時(shí)連接的初始化工作均已完成。對(duì)于業(yè)務(wù)請(qǐng)求處理而言,直接利用現(xiàn)有可用連接,避免了從數(shù)據(jù)庫(kù)連接初始化和釋放過程的開銷,從而縮減了系統(tǒng)整體響應(yīng)時(shí)間。
  3. 統(tǒng)?的連接管理:避免數(shù)據(jù)庫(kù)連接泄露,在較為完備的數(shù)據(jù)庫(kù)連接池實(shí)現(xiàn)中,可根據(jù)預(yù)先的連接占用超時(shí)設(shè)定,強(qiáng)制收回被占用連接。從而避免了常規(guī)數(shù)據(jù)庫(kù)連接操作中可能出現(xiàn)的資源泄露。

如果不使用連接池

  1. TCP建立連接的三次握手(客戶端與MySQL服務(wù)器的連接基于TCP協(xié)議)
  2. MySQL認(rèn)證的三次握手
  3. 真正的SQL執(zhí)行
  4. MySQL的關(guān)閉
  5. TCP的四次握手關(guān)閉

可以看到,為了執(zhí)行?條SQL,需要進(jìn)行TCP三次握手,Mysql認(rèn)證、Mysql關(guān)閉、TCP四次揮手等其他操作,執(zhí)行SQL操作在所有的操作占比非常低。

圖片

優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單

缺點(diǎn):

  • 網(wǎng)絡(luò)IO較多
  • 帶寬利用率低
  • QPS較低
  • 應(yīng)用頻繁低創(chuàng)建連接和關(guān)閉連接,導(dǎo)致臨時(shí)對(duì)象較多,帶來更多的內(nèi)存碎片
  • 在關(guān)閉連接后,會(huì)出現(xiàn)大量TIME_WAIT 的TCP狀態(tài)(在2個(gè)MSL之后關(guān)閉)

相關(guān)視頻推薦

150行代碼,帶你手寫線程池,自行準(zhǔn)備linux環(huán)境

高并發(fā)技術(shù)之?dāng)?shù)據(jù)庫(kù)連接池設(shè)計(jì)與實(shí)現(xiàn)

網(wǎng)絡(luò)原理tcp/udp,網(wǎng)絡(luò)編程epoll/reactor

需要C/C++ Linux服務(wù)器架構(gòu)師學(xué)習(xí)資料加qun812855908獲?。?a href="http://wenjunhu.com/soft/special/" target="_blank">資料包括C/C++,Linux,golang技術(shù),Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK,ffmpeg等),免費(fèi)分享

圖片

使用連接池

第?次訪問的時(shí)候,需要建立連接。但是之后的訪問,均會(huì)復(fù)用之前創(chuàng)建的連接,直接執(zhí)行SQL語句。

圖片

優(yōu)點(diǎn):

  1. 降低了網(wǎng)絡(luò)開銷
  2. 連接復(fù)用,有效減少連接數(shù)。
  3. 提升性能,避免頻繁的新建連接。新建連接的開銷比較大
  4. 沒有TIME_WAIT狀態(tài)的問題

缺點(diǎn):

  1. 設(shè)計(jì)較為復(fù)雜

長(zhǎng)連接和連接池的區(qū)別

  • 長(zhǎng)連接是?些驅(qū)動(dòng)、驅(qū)動(dòng)框架、ORM工具的特性,由驅(qū)動(dòng)來保持連接句柄的打開,以便后續(xù)的數(shù)據(jù)庫(kù)操作可以重用連接,從而減少數(shù)據(jù)庫(kù)的連接開銷。
  • 而連接池是應(yīng)用服務(wù)器的組件,它可以通過參數(shù)來配置連接數(shù)、連接檢測(cè)、連接的生命周期等。
  • 連接池內(nèi)的連接,其實(shí)就是長(zhǎng)連接。

數(shù)據(jù)庫(kù)連接池運(yùn)行機(jī)制

  1. 用戶發(fā)送請(qǐng)求,把請(qǐng)求插入到消息隊(duì)列
  2. 線程池中的線程競(jìng)爭(zhēng)從消息隊(duì)列拿出任務(wù)(涉及多線程競(jìng)爭(zhēng),加鎖)
  3. 線程從連接池獲取或創(chuàng)建可用連接(涉及多線程競(jìng)爭(zhēng),加鎖)
  4. 利用連接對(duì)象和用戶請(qǐng)求任務(wù)請(qǐng)求數(shù)據(jù)庫(kù)數(shù)據(jù)
  5. 使用完畢之后,把連接返回給連接池 (涉及多線程競(jìng)爭(zhēng),加鎖)

在系統(tǒng)關(guān)閉前,斷開所有連接并釋放連接占用的系統(tǒng)資源;

圖片

連接池和線程池的關(guān)系

連接池和線程池的區(qū)別:

  • 線程池:主動(dòng)調(diào)用任務(wù)。當(dāng)任務(wù)隊(duì)列不為空的時(shí)候從隊(duì)列取任務(wù)取執(zhí)行。比如去銀行辦理業(yè)務(wù),窗口柜員是線程,多個(gè)窗口組成了線程池,柜員從排號(hào)隊(duì)列叫號(hào)執(zhí)行。
  • 連接池:被動(dòng)被任務(wù)使用。當(dāng)某任務(wù)需要操作數(shù)據(jù)庫(kù)時(shí),只要從連接池中取出?個(gè)連接對(duì)象,當(dāng)任務(wù)使用完該連接對(duì)象后,將該連接對(duì)象放回到連接池中。如果連接池中沒有連接對(duì)象可以用,那么該任務(wù)就必須等待。比如去銀行用筆填單,筆是連接對(duì)象,我們要用筆的時(shí)候去取,用完了還回去

連接池和線程池設(shè)置數(shù)量的關(guān)系:

  • ?般線程池線程數(shù)量和連接池連接對(duì)象數(shù)量?致;
  • ?般線程執(zhí)行任務(wù)完畢的時(shí)候歸還連接對(duì)象;

線程池設(shè)計(jì)要點(diǎn)

使?連接池需要預(yù)先建立數(shù)據(jù)庫(kù)連接

線程池設(shè)計(jì)思路:

  1. 連接到數(shù)據(jù)庫(kù),涉及到數(shù)據(jù)庫(kù)ip、端口、用戶名、密碼、數(shù)據(jù)庫(kù)名字等;a. 連接的操作,每個(gè)連接對(duì)象都是獨(dú)立的連接通道,它們是獨(dú)立的
    b. 配置最小連接數(shù)和最大連接數(shù)
  2. 需要?個(gè)隊(duì)列管理他的連接,比如使用list;
  3. 獲取連接對(duì)象
  4. 歸還連接對(duì)象

(同步方式)連接池的實(shí)現(xiàn)(偽代碼)

#include

//數(shù)據(jù)庫(kù)連接類(一個(gè)對(duì)象對(duì)應(yīng)一個(gè)Mysql/Redis連接)
class CDBConn {
int Init(); //初始化,連接數(shù)據(jù)庫(kù)操作
MYSQL* m_mysql; // 對(duì)應(yīng)一個(gè)連接
};

//連接池
class CDBPool {
int Init(); // 連接數(shù)據(jù)庫(kù),用于for循環(huán)創(chuàng)建CDBConn對(duì)象并且調(diào)用CDBConn->Init()
CDBConn* GetDBConn(const int timeout_ms = -1); // 獲取連接資源(即從m_free_list拿出一個(gè)連接對(duì)象)
void RelDBConn(CDBConn* pConn); // 歸還連接資源(即把連接對(duì)象放回m_free_list)
list m_free_list; // 空閑的連接
list m_used_list; // 記錄已經(jīng)被請(qǐng)求的連接
};

//用戶請(qǐng)求的任務(wù)
struct job
{
void* (*callback_function)(void *arg); //線程回調(diào)函數(shù)
void *arg; //回調(diào)函數(shù)參數(shù)
struct job *next;
};

//線程池
struct threadpool{
//用戶請(qǐng)求的任務(wù)插入到j(luò)ob list中
struct job *head; //指向job的頭指針
struct job *tail; //指向job的尾指針

//工作線程
int thread_num; //線程池中開啟線程的個(gè)數(shù)
pthread_t *pthreads; //線程池中所有線程的pthread_t
};


//工作線程執(zhí)行的函數(shù)
void *threadpool_function(void *arg){
while(1){
//從消息隊(duì)列中取任務(wù)
task = pop_task();
//從連接池取一個(gè)數(shù)據(jù)庫(kù)連接對(duì)象
CDBConn *pDBConn = pDBPool->GetDBConn();
//請(qǐng)求數(shù)據(jù)庫(kù)(同步方式,阻塞等待數(shù)據(jù)庫(kù)消息返回)
query_db(pDBConn, task);
//把數(shù)據(jù)庫(kù)連接對(duì)象放回連接池
pDBPool->RelDBConn(pDBConn);
}
}*>*>

連接池連接設(shè)置數(shù)量

連接數(shù) = ((核心數(shù) * 2) + 有效磁盤數(shù))

按照這個(gè)公式,即是說你的服務(wù)器 CPU 是 4核 i7 的,那連接池連接數(shù)大小應(yīng)該為 ((4*2)+1)=9

這里只是?個(gè)經(jīng)驗(yàn)公式。還要和線程池?cái)?shù)量以及具體業(yè)務(wù)結(jié)合在?起

線程池

服務(wù)端epoll三種處理客戶端信息方法模型:

while(1){
int n = epoll_wait();
for(n){
#if //寫法一 網(wǎng)絡(luò)線程處理解析以及業(yè)務(wù)邏輯后直接發(fā)給客戶端(單線程服務(wù)端)
recv(fd, buffer, length, 0);
parser();
send();
#elseif //寫法二:網(wǎng)絡(luò)線程把收到fd交給工作線程處理解析以及業(yè)務(wù)邏輯和發(fā)給客戶端(多線程服務(wù)器)
//該模式有缺點(diǎn):可能存在多個(gè)線程同時(shí)對(duì)一個(gè)fd進(jìn)行操作!
//場(chǎng)景:同一個(gè)客戶端短時(shí)間內(nèi)發(fā)來多條請(qǐng)求,被分給了多個(gè)不同的線程處理,那么就出現(xiàn)多個(gè)線程同時(shí)對(duì)一個(gè)fd操作的情況。如果線程一個(gè)對(duì)fd寫,另一個(gè)線程對(duì)fd進(jìn)行close,就會(huì)引發(fā)錯(cuò)誤
//因此需要特殊處理。處理方法:加入?yún)f(xié)程。每個(gè)協(xié)程處理一個(gè)IO。但是底層依然是依賴于epoll管理所有IO
task = fd;
push_tasks(task);
#else //寫法三:網(wǎng)絡(luò)線程解析完信息后,交給工作線程處理業(yè)務(wù)邏輯和發(fā)給客戶端(多線程服務(wù)器)
recv(fd, buffer, length, 0);
push_task(buffer);
#endif
}
}

線程池圖示

圖片

線程池代碼演示

#include
#include
#include

#include



#define LL_ADD(item, list) do {
item->prev = NULL;
item->next = list;
list = item;
} while(0)

#define LL_REMOVE(item, list) do {
if (item->prev != NULL) item->prev->next = item->next;
if (item->next != NULL) item->next->prev = item->prev;
if (list == item) list = item->next;
item->prev = item->next = NULL;
} while(0)


typedef struct NWORKER {
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;

typedef struct NJOB {
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;

typedef struct NWORKQUEUE {
struct NWORKER *workers;
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;

typedef nWorkQueue nThreadPool;

static void *ntyWorkerThread(void *ptr) {
nWorker *worker = (nWorker*)ptr;

while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);

while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}

if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}

nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}

pthread_mutex_unlock(&worker->workqueue->jobs_mtx);

if (job == NULL) continue;

job->job_function(job);
}

free(worker);
pthread_exit(NULL);
}



int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {

if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));

pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));

pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));

int i = 0;
for (i = 0;i < numWorkers;i ++) {
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL) {
perror("malloc");
return 1;
}

memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;

int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret) {

perror("pthread_create");
free(worker);

return 1;
}

LL_ADD(worker, worker->workqueue->workers);
}

return 0;
}


void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;

for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1;
}

pthread_mutex_lock(&workqueue->jobs_mtx);

workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;

pthread_cond_broadcast(&workqueue->jobs_cond);

pthread_mutex_unlock(&workqueue->jobs_mtx);

}

void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {

pthread_mutex_lock(&workqueue->jobs_mtx);

LL_ADD(job, workqueue->waiting_jobs);

pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);

}




/************************** debug thread pool **************************/
//sdk --> software develop kit
// 提供SDK給其他開發(fā)者使用

#if 1

#define KING_MAX_THREAD 80
#define KING_COUNTER_SIZE 1000

void king_counter(nJob *job) {

int index = *(int*)job->user_data;

printf("index : %d, selfid : %lun", index, pthread_self());

free(job->user_data);
free(job);
}



int main(int argc, char *argv[]) {

nThreadPool pool;

ntyThreadPoolCreate(&pool, KING_MAX_THREAD);

int i = 0;
for (i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}

job->job_function = king_counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;

ntyThreadPoolQueue(&pool, job);

}

getchar();
printf("n");


}

#endif

內(nèi)存池

為什么要用內(nèi)存池:

  1. 在需要堆內(nèi)存管理一些數(shù)據(jù)的時(shí)候直接malloc,容易造成內(nèi)存碎片
  2. 在需要堆內(nèi)存管理一些數(shù)據(jù)的時(shí)候直接malloc,容易忘記free,造成內(nèi)存泄漏,利于內(nèi)存管理

策略

  1. 小塊內(nèi)存(<4k):先分配一個(gè)整塊,在整塊里每次用一小塊內(nèi)存
  2. 大塊內(nèi)存(>4k):直接分配

圖示

圖片

代碼示例

#include
#include
#include
#include
#include

#define MP_ALIGNMENT 32
#define MP_PAGE_SIZE 4096
#define MP_MAX_ALLOC_FROM_POOL (MP_PAGE_SIZE-1)

#define mp_align(n, alignment) (((n)+(alignment-1)) & ~(alignment-1))
#define mp_align_ptr(p, alignment) (void *)((((size_t)p)+(alignment-1)) & ~(alignment-1))

struct mp_large_s {
struct mp_large_s *next; //下個(gè)內(nèi)存結(jié)點(diǎn)
void *alloc; //分配的內(nèi)存的頭位置
};

struct mp_node_s {

unsigned char *last; //內(nèi)存結(jié)點(diǎn)末尾位置
unsigned char *end; //內(nèi)存結(jié)點(diǎn)已分配內(nèi)存的末尾位置

struct mp_node_s *next; //下一個(gè)內(nèi)存結(jié)點(diǎn)
size_t failed; //嘗試?yán)么私Y(jié)點(diǎn)剩余空間失敗次數(shù)
};

struct mp_pool_s {
//該內(nèi)存池組織了大塊內(nèi)存和小塊內(nèi)存 二者分配方式不一樣

size_t max; //MP_PAGE_SIZE

struct mp_large_s *large; //大塊內(nèi)存
struct mp_node_s *current; //小塊內(nèi)存 當(dāng)前位置

struct mp_node_s head[0]; //小塊內(nèi)存 頭節(jié)點(diǎn)位置

};

struct mp_pool_s *mp_create_pool(size_t size);
void mp_destory_pool(struct mp_pool_s *pool);
void *mp_alloc(struct mp_pool_s *pool, size_t size);
void *mp_nalloc(struct mp_pool_s *pool, size_t size);
void *mp_calloc(struct mp_pool_s *pool, size_t size);
void mp_free(struct mp_pool_s *pool, void *p);


struct mp_pool_s *mp_create_pool(size_t size) {

struct mp_pool_s *p;
int ret = posix_memalign((void **)&p, MP_ALIGNMENT, size + sizeof(struct mp_pool_s) + sizeof(struct mp_node_s));
if (ret) {
return NULL;
}

p->max = (size < MP_MAX_ALLOC_FROM_POOL) ? size : MP_MAX_ALLOC_FROM_POOL;
p->current = p->head;
p->large = NULL;

p->head->last = (unsigned char *)p + sizeof(struct mp_pool_s) + sizeof(struct mp_node_s);
p->head->end = p->head->last + size;

p->head->failed = 0;

return p;

}

void mp_destory_pool(struct mp_pool_s *pool) {

struct mp_node_s *h, *n;
struct mp_large_s *l;

for (l = pool->large; l; l = l->next) {
if (l->alloc) {
free(l->alloc);
}
}

h = pool->head->next;

while (h) {
n = h->next;
free(h);
h = n;
}

free(pool);

}

void mp_reset_pool(struct mp_pool_s *pool) {

struct mp_node_s *h;
struct mp_large_s *l;

for (l = pool->large; l; l = l->next) {
if (l->alloc) {
free(l->alloc);
}
}

pool->large = NULL;

for (h = pool->head; h; h = h->next) {
h->last = (unsigned char *)h + sizeof(struct mp_node_s);
}

}

static void *mp_alloc_block(struct mp_pool_s *pool, size_t size) {

unsigned char *m;
struct mp_node_s *h = pool->head;
size_t psize = (size_t)(h->end - (unsigned char *)h);

int ret = posix_memalign((void **)&m, MP_ALIGNMENT, psize);
if (ret) return NULL;

struct mp_node_s *p, *new_node, *current;
new_node = (struct mp_node_s*)m;

new_node->end = m + psize;
new_node->next = NULL;
new_node->failed = 0;

m += sizeof(struct mp_node_s);
m = mp_align_ptr(m, MP_ALIGNMENT);
new_node->last = m + size;

current = pool->current;

for (p = current; p->next; p = p->next) {
if (p->failed++ > 4) {
current = p->next;
}
}
p->next = new_node;

pool->current = current ? current : new_node;

return m;

}

static void *mp_alloc_large(struct mp_pool_s *pool, size_t size) {

void *p = malloc(size);
if (p == NULL) return NULL;

size_t n = 0;
struct mp_large_s *large;
for (large = pool->large; large; large = large->next) {
if (large->alloc == NULL) {
large->alloc = p;
return p;
}
if (n ++ > 3) break;
}

// 把 mp_large_s 結(jié)構(gòu)體 放到 小塊內(nèi)存的結(jié)點(diǎn)里
large = mp_alloc(pool, sizeof(struct mp_large_s));
if (large == NULL) {
free(p);
return NULL;
}

large->alloc = p;
large->next = pool->large;
pool->large = large;

return p;
}

void *mp_memalign(struct mp_pool_s *pool, size_t size, size_t alignment) {

void *p;

int ret = posix_memalign(&p, alignment, size);
if (ret) {
return NULL;
}

struct mp_large_s *large = mp_alloc(pool, sizeof(struct mp_large_s));
if (large == NULL) {
free(p);
return NULL;
}

large->alloc = p;
large->next = pool->large;
pool->large = large;

return p;
}




void *mp_alloc(struct mp_pool_s *pool, size_t size) {

unsigned char *m;
struct mp_node_s *p;

if (size <= pool->max) {

p = pool->current;

do {

m = mp_align_ptr(p->last, MP_ALIGNMENT);
if ((size_t)(p->end - m) >= size) {
p->last = m + size;
return m;
}
p = p->next;
} while (p);

return mp_alloc_block(pool, size);
}

return mp_alloc_large(pool, size);

}


void *mp_nalloc(struct mp_pool_s *pool, size_t size) {

unsigned char *m;
struct mp_node_s *p;

if (size <= pool->max) {
p = pool->current;

do {
m = p->last;
if ((size_t)(p->end - m) >= size) {
p->last = m+size;
return m;
}
p = p->next;
} while (p);

return mp_alloc_block(pool, size);
}

return mp_alloc_large(pool, size);

}

void *mp_calloc(struct mp_pool_s *pool, size_t size) {

void *p = mp_alloc(pool, size);
if (p) {
memset(p, 0, size);
}

return p;

}

void mp_free(struct mp_pool_s *pool, void *p) {

struct mp_large_s *l;
for (l = pool->large; l; l = l->next) {
if (p == l->alloc) {
free(l->alloc);
l->alloc = NULL;

return ;
}
}

}


int main(int argc, char *argv[]) {

int size = 1 << 12;

struct mp_pool_s *p = mp_create_pool(size);

int i = 0;
for (i = 0;i < 10;i ++) {

void *mp = mp_alloc(p, 512);
// mp_free(mp);
}

//printf("mp_create_pool: %ldn", p->max);
printf("mp_align(123, 32): %d, mp_align(17, 32): %dn", mp_align(24, 32), mp_align(17, 32));
//printf("mp_align_ptr(p->current, 32): %lx, p->current: %lx, mp_align(p->large, 32): %lx, p->large: %lxn", mp_align_ptr(p->current, 32), p->current, mp_align_ptr(p->large, 32), p->large);

int j = 0;
for (i = 0;i < 5;i ++) {

char *pp = mp_calloc(p, 32);
for (j = 0;j < 32;j ++) {
if (pp[j]) {
printf("calloc wrongn");
}
printf("calloc successn");
}
}

//printf("mp_reset_pooln");

for (i = 0;i < 5;i ++) {
void *l = mp_alloc(p, 8192);
mp_free(p, l);
}

mp_reset_pool(p);

//printf("mp_destory_pooln");
for (i = 0;i < 58;i ++) {
mp_alloc(p, 256);
}

mp_destory_pool(p);

return 0;

}

異步請(qǐng)求池

同步請(qǐng)求要點(diǎn):

請(qǐng)求方作為客戶端請(qǐng)求后(send/sendto),立即調(diào)用(recv/recvfrom)阻塞等待結(jié)果返回。

圖片

int dns_client_commit(const char *domain) {

int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("create socket failedn");
exit(-1);
}

printf("url:%sn", domain);

struct sockaddr_in dest;
bzero(&dest, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(53);
dest.sin_addr.s_addr = inet_addr(DNS_SVR);

int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
printf("connect :%dn", ret);

struct dns_header header = {0};
dns_create_header(&header);

struct dns_question question = {0};
dns_create_question(&question, domain);

char request[1024] = {0};
int req_len = dns_build_request(&header, &question, request);
int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));

char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);

int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);

printf("recvfrom n : %dn", n);
struct dns_item *domains = NULL;
dns_parse_response(buffer, &domains);

return 0;
}

異步請(qǐng)求要點(diǎn):

  1. 請(qǐng)求方作為客戶端請(qǐng)求后(send/sendto),把fd交給epoll管理,不等待結(jié)果返回(recv/recvfrom)。
  2. epoll_wait在一個(gè)線程死循環(huán)中,當(dāng)epoll收到消息,在進(jìn)行處理(recv/recvfrom)。

圖片

typedef void (*async_result_cb)(struct dns_item *arg, int count);

struct async_context {
int epfd;
pthread_t threadid;
};

struct ep_arg {
int sockfd;
async_result_cb cb;
};


#define ASYNC_EVENTS 128

void *dns_async_callback(void *arg) {

struct async_context* ctx = (struct async_context*)arg;

while (1) {
struct epoll_event events[ASYNC_EVENTS] = {0};

int nready = epoll_wait(ctx->epfd, events, ASYNC_EVENTS, -1);
if (nready < 0) {
continue;
}

int i = 0;
for (i = 0;i < nready;i ++) {

struct ep_arg *ptr = events[i].data.ptr;
int sockfd = ptr->sockfd;

char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);

int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);

//協(xié)議解析
//這里是DNS協(xié)議解析、也可以換成Mysql、Redis協(xié)議解析等。
printf("recvfrom n : %dn", n);
struct dns_item *domains = NULL;
int count = dns_parse_response(buffer, &domains);

//執(zhí)行回調(diào)函數(shù)
ptr->cb(domains, count);

// close sockfd
close (sockfd);
free(ptr);
epoll_ctl(ctx->epfd, EPOLL_CTL_DEL, sockfd, NULL);
}

}
}

struct async_context* dns_async_client_init(void) {

int epfd = epoll_create(1);
if (epfd < 0) return NULL;

struct async_context* ctx = calloc(1, sizeof(struct async_context));
if (ctx == NULL) return NULL;

ctx->epfd = epfd;

int ret = pthread_create(&ctx->threadid, NULL, dns_async_callback, ctx);
if (ret) {
close(epfd);
free(ctx);
return NULL;
}

return ctx;
}

int dns_async_client_destroy(struct async_context* ctx) {
close(ctx->epfd);
pthread_cancel(ctx->threadid);
}

int dns_async_client_commit(struct async_context *ctx, async_result_cb cb) {

int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("create socket failedn");
exit(-1);
}

printf("url:%sn", domain);

struct sockaddr_in dest;
bzero(&dest, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(53);
dest.sin_addr.s_addr = inet_addr(DNS_SVR);

int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
printf("connect :%dn", ret);

struct dns_header header = {0};
dns_create_header(&header);

struct dns_question question = {0};
dns_create_question(&question, domain);

char request[1024] = {0};
int req_len = dns_build_request(&header, &question, request);
int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));

struct ep_arg *ptr = calloc(1, sizeof(struct ep_arg));
if (ptr == NULL) return -1;
ptr->sockfd = sockfd;
ptr->cb = cb;

//
struct epoll_event ev;
ev.data.ptr = ptr;
ev.events = EPOLLIN; //可讀
epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, sockfd, &ev);

return 0;
}

執(zhí)行順序:

  1. 調(diào)用dns_async_client_init創(chuàng)建epoll和處理epoll_wait對(duì)應(yīng)的線程。
  2. 調(diào)用dns_async_client_commit提交請(qǐng)求(send/sendto),并且把對(duì)應(yīng)fd交給epoll管理
  3. 在dns_async_callback線程中,死循環(huán)中epoll_wait檢測(cè)到EPOLLIN可讀事件,然后調(diào)用(recv/recvfrom)和callback函數(shù)處理請(qǐng)求返回的response事件。
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 服務(wù)器
    +關(guān)注

    關(guān)注

    12

    文章

    9160

    瀏覽量

    85421
  • 數(shù)據(jù)庫(kù)
    +關(guān)注

    關(guān)注

    7

    文章

    3799

    瀏覽量

    64389
  • 程序
    +關(guān)注

    關(guān)注

    117

    文章

    3787

    瀏覽量

    81043
  • Redis
    +關(guān)注

    關(guān)注

    0

    文章

    375

    瀏覽量

    10877
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    關(guān)于數(shù)據(jù)庫(kù)連接池總結(jié)

    數(shù)據(jù)庫(kù)連接池小結(jié)
    發(fā)表于 04-30 08:07

    線程是如何實(shí)現(xiàn)的

    線程的概念是什么?線程是如何實(shí)現(xiàn)的?
    發(fā)表于 02-28 06:20

    關(guān)于RT-Thread內(nèi)存管理的內(nèi)存簡(jiǎn)析

    ,一個(gè)線程試圖從內(nèi)存申請(qǐng)內(nèi)存塊,一個(gè)線程釋放內(nèi)存塊。示例代碼如下:編譯運(yùn)行結(jié)果如下:其他管理函
    發(fā)表于 04-06 17:02

    連接池工作原理

    連接池技術(shù)的核心思想是連接復(fù)用,通過建立一個(gè)數(shù)據(jù)庫(kù)連接池以及一套連接使用、分配和管理策略,使得該連接池中的
    的頭像 發(fā)表于 03-22 16:16 ?7689次閱讀

    數(shù)據(jù)庫(kù)連接池的設(shè)置怎么確定大小

    數(shù)據(jù)庫(kù)連接池的配置是開發(fā)者們常常搞出坑的地方,在配置數(shù)據(jù)庫(kù)連接池時(shí),有幾個(gè)可以說是和直覺背道而馳的原則需要明確。
    的頭像 發(fā)表于 05-04 14:23 ?2746次閱讀
    數(shù)據(jù)庫(kù)<b class='flag-5'>連接池</b>的設(shè)置怎么確定大小

    線程線程

    線程通常用于服務(wù)器應(yīng)用程序。 每個(gè)傳入請(qǐng)求都將分配給線程池中的一個(gè)線程,因此可以異步處理
    的頭像 發(fā)表于 02-28 09:53 ?794次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    什么是內(nèi)存

    使用時(shí)就會(huì)變得非常快 捷,大大提高程序運(yùn)行效率。 在計(jì)算機(jī)中,有很多使用“”這種技術(shù)的地方,除了內(nèi)存,還有連接池、線程
    的頭像 發(fā)表于 11-08 16:26 ?905次閱讀
    什么是<b class='flag-5'>內(nèi)存</b><b class='flag-5'>池</b>

    高并發(fā)內(nèi)存項(xiàng)目實(shí)現(xiàn)

    本項(xiàng)目實(shí)現(xiàn)了一個(gè)高并發(fā)內(nèi)存,參考了Google的開源項(xiàng)目tcmalloc實(shí)現(xiàn)的簡(jiǎn)易版;其功能就是實(shí)現(xiàn)高效的多線程內(nèi)存管理。由功能可知,高并發(fā)指的是高效的多
    的頭像 發(fā)表于 11-09 11:16 ?722次閱讀
    高并發(fā)<b class='flag-5'>內(nèi)存</b><b class='flag-5'>池</b>項(xiàng)目實(shí)現(xiàn)

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優(yōu)勢(shì) C++線程簡(jiǎn)介
    的頭像 發(fā)表于 11-10 10:24 ?537次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來的目的就是:提高程序執(zhí)行效
    的頭像 發(fā)表于 11-10 16:37 ?526次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    內(nèi)存的使用場(chǎng)景

    山中,非常容易出現(xiàn)內(nèi)存泄漏導(dǎo)致mmo的問題。 為了解決這兩個(gè)問題,內(nèi)存就應(yīng)運(yùn)而生了。內(nèi)存預(yù)先
    的頭像 發(fā)表于 11-10 17:19 ?711次閱讀
    <b class='flag-5'>內(nèi)存</b><b class='flag-5'>池</b>的使用場(chǎng)景

    內(nèi)存主要解決的問題

    程 序占有的資源數(shù)量。 經(jīng)常使用的技術(shù)包括內(nèi)存、線程連接池等,其中尤以
    的頭像 發(fā)表于 11-13 15:23 ?709次閱讀
    <b class='flag-5'>內(nèi)存</b><b class='flag-5'>池</b>主要解決的問題

    化技術(shù)的應(yīng)用實(shí)踐

    作為一名Java開發(fā)人員,化技術(shù)或多或少在業(yè)務(wù)代碼中使用。常見的包括線程、連接池等。也是因?yàn)镴ava語言超級(jí)豐富的基建,基本上這些化能
    的頭像 發(fā)表于 11-24 10:22 ?526次閱讀
    <b class='flag-5'>池</b>化技術(shù)的應(yīng)用實(shí)踐

    線程的運(yùn)轉(zhuǎn)流程圖 化技術(shù)實(shí)踐案例解析

    作為一名Java開發(fā)人員,化技術(shù)或多或少在業(yè)務(wù)代碼中使用。常見的包括線程連接池等。也是因?yàn)镴ava語言超級(jí)豐富的基建,基本上這些化能
    的頭像 發(fā)表于 11-24 10:22 ?471次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的運(yùn)轉(zhuǎn)流程圖 <b class='flag-5'>池</b>化技術(shù)實(shí)踐案例解析

    什么是動(dòng)態(tài)線程?動(dòng)態(tài)線程的簡(jiǎn)單實(shí)現(xiàn)思路

    因此,動(dòng)態(tài)可監(jiān)控線程一種針對(duì)以上痛點(diǎn)開發(fā)的線程管理工具。主要可實(shí)現(xiàn)功能有:提供對(duì) Spring 應(yīng)用內(nèi)線程
    的頭像 發(fā)表于 02-28 10:42 ?645次閱讀