0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

基于多路復(fù)用模型的Netty框架

科技綠洲 ? 來源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-09-30 11:30 ? 次閱讀

Netty

version: 4.1.55.Final

傳統(tǒng)的IO模型的web容器,比如老版本的Tomcat,為了增加系統(tǒng)的吞吐量,需要不斷增加系統(tǒng)核心線程數(shù)量,或者通過水平擴(kuò)展服務(wù)器數(shù)量,來增加系統(tǒng)處理請(qǐng)求的能力。 有了NIO之后,一個(gè)線程即可處理多個(gè)連接事件,基于多路復(fù)用模型的Netty框架,不僅降低了使用NIO的復(fù)雜度,

優(yōu)點(diǎn)

Netty是一款以java NIO為基礎(chǔ),基于事件驅(qū)動(dòng)模型支持異步、高并發(fā)的網(wǎng)絡(luò)應(yīng)用框架

  • API使用簡單,開發(fā)門檻低,簡化了NIO開發(fā)網(wǎng)絡(luò)程序的復(fù)雜度
  • 功能強(qiáng)大,預(yù)置多種編解碼功能,支持多種主流協(xié)議,比如Http、WebSocket。
  • 定制能力強(qiáng),可以通過ChannelHandler對(duì)通信框架靈活擴(kuò)展。
  • 性能高,支持異步非阻塞通信模型
  • 成熟穩(wěn)定,社區(qū)活躍,已經(jīng)修復(fù)了Java NIO所有的Bug。
  • 經(jīng)歷了大規(guī)模商業(yè)應(yīng)用的考驗(yàn),質(zhì)量有保證。

IO模型

select、poll和epoll

操作系統(tǒng)內(nèi)核基于這些函數(shù)實(shí)現(xiàn)非阻塞IO,以此實(shí)現(xiàn)多路復(fù)用模型

  • select

select

  1. select 調(diào)用需要傳入 fd 數(shù)組,需要拷貝一份到內(nèi)核,高并發(fā)場景下這樣的拷貝消耗的資源是驚人的。(可優(yōu)化為不復(fù)制)
  2. select 在內(nèi)核層仍然是通過遍歷的方式檢查文件描述符的就緒狀態(tài),是個(gè)同步過程,只不過無系統(tǒng)調(diào)用切換上下文的開銷。(內(nèi)核層可優(yōu)化為異步事件通知)
  3. select 僅僅返回可讀文件描述符的個(gè)數(shù),具體哪個(gè)可讀還是要用戶自己遍歷。(可優(yōu)化為只返回給用戶就緒的文件描述符,無需用戶做無效的遍歷)
  • pool

和 select 的主要區(qū)別就是,去掉了 select 只能監(jiān)聽 1024 個(gè)文件描述符的限制

  • epool

epool

  1. 內(nèi)核中保存一份文件描述符集合,無需用戶每次都重新傳入,只需告訴內(nèi)核修改的部分即可。
  2. 內(nèi)核不再通過輪詢的方式找到就緒的文件描述符,而是通過異步 IO 事件喚醒。
  3. 內(nèi)核僅會(huì)將有 IO 事件的文件描述符返回給用戶,用戶也無需遍歷整個(gè)文件描述符集合。

Reactor模型

一、單Reactor單線程 1)可以實(shí)現(xiàn)通過一個(gè)阻塞對(duì)象監(jiān)聽多個(gè)鏈接請(qǐng)求

2)Reactor對(duì)象通過select監(jiān)聽客戶端請(qǐng)求事件,通過dispatch進(jìn)行分發(fā)

3)如果是建立鏈接請(qǐng)求,則由Acceptor通過accept處理鏈接請(qǐng)求,然后創(chuàng)建一個(gè)Handler對(duì)象處理完成鏈接后的各種事件

4)如果不是鏈接請(qǐng)求,則由Reactor分發(fā)調(diào)用鏈接對(duì)應(yīng)的Handler來處理

5)Handler會(huì)完成Read->業(yè)務(wù)處理->send的完整業(yè)務(wù)流程

reactor

二、單Reactor多線程 1)Reactor對(duì)象通過select監(jiān)聽客戶端請(qǐng)求事件,收到事件后,通過dispatch分發(fā)

2)如果是建立鏈接請(qǐng)求,則由Acceptor通過accept處理鏈接請(qǐng)求,然后創(chuàng)建一個(gè)Handler對(duì)象處理完成鏈接后的各種事件

3)如果不是鏈接請(qǐng)求,則由Reactor分發(fā)調(diào)用鏈接對(duì)應(yīng)的Handler來處理

4)Handler只負(fù)責(zé)事件響應(yīng)不做具體業(yè)務(wù)處理

5)通過read讀取數(shù)據(jù)后,分發(fā)到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過send將結(jié)果返回給client

reactor

三、主從Reactor多線程 1)Reactor主線程MainReactor對(duì)象通過select監(jiān)聽鏈接事件,通過Acceptor處理

2)當(dāng)Acceptor處理鏈接事件后,MainReactor將鏈接分配給SubReactor

3)SubReactor將鏈接加入到隊(duì)列進(jìn)行監(jiān)聽,并創(chuàng)建Handler進(jìn)行事件處理

4)當(dāng)有新事件發(fā)生時(shí),SubReactor就會(huì)調(diào)用對(duì)應(yīng)的Handler處理

5)Handler通過read讀取數(shù)據(jù),分發(fā)到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過send將結(jié)果返回給client

6)Reactor主線程可以對(duì)應(yīng)多個(gè)Reactor子線程

reactor

三種模式用生活案例來理解 1)單Reactor單線程,前臺(tái)接待員和服務(wù)員是同一個(gè)人,全程為顧客服務(wù)

2)單Reactor多線程,1個(gè)前臺(tái)接待員,多個(gè)服務(wù)員,接待員只負(fù)責(zé)接待

3)主從Reactor多線程,多個(gè)前臺(tái)接待員,多個(gè)服務(wù)員

Reactor模型具有如下優(yōu)點(diǎn) 1)響應(yīng)快,不必為單個(gè)同步事件所阻塞,雖然Reactor本身依然是同步的

2)可以最大程度的避免復(fù)雜的多線程及同步問題,并且避免了多線程/進(jìn)程的切換開銷

3)擴(kuò)展性好,可以方便的通過增加Reactor實(shí)例個(gè)數(shù)來充分利用CPU資源

4)復(fù)用性好,Reactor模型本身與具體事件處理邏輯無關(guān),具有很高的復(fù)用性

核心組件

1.Bootstrap 一個(gè)Netty應(yīng)用通常由一個(gè)Bootstrap開始,它主要作用是配置整個(gè)Netty程序,串聯(lián)起各個(gè)組件。

Handler,為了支持各種協(xié)議和處理數(shù)據(jù)的方式,便誕生了Handler組件。Handler主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。

2.ChannelInboundHandler 一個(gè)最常用的Handler。這個(gè)Handler的作用就是處理接收到數(shù)據(jù)時(shí)的事件,也就是說,我們的業(yè)務(wù)邏輯一般就是寫在這個(gè)Handler里面的,ChannelInboundHandler就是用來處理我們的核心業(yè)務(wù)邏輯。

3.ChannelInitializer 當(dāng)一個(gè)鏈接建立時(shí),我們需要知道怎么來接收或者發(fā)送數(shù)據(jù),當(dāng)然,我們有各種各樣的Handler實(shí)現(xiàn)來處理它,那么ChannelInitializer便是用來配置這些Handler,它會(huì)提供一個(gè)ChannelPipeline,并把Handler加入到ChannelPipeline。

4.ChannelPipeline 一個(gè)Netty應(yīng)用基于ChannelPipeline機(jī)制,這種機(jī)制需要依賴于EventLoop和EventLoopGroup,因?yàn)樗鼈內(nèi)齻€(gè)都和事件或者事件處理相關(guān)。

EventLoops的目的是為Channel處理IO操作,一個(gè)EventLoop可以為多個(gè)Channel服務(wù)。

EventLoopGroup會(huì)包含多個(gè)EventLoop。

5.Channel 代表了一個(gè)Socket鏈接,或者其它和IO操作相關(guān)的組件,它和EventLoop一起用來參與IO處理。

6.Future 在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽,具體的實(shí)現(xiàn)就是通過Future和ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽,當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽會(huì)自動(dòng)觸發(fā)。

示例

通過一個(gè)簡單的示例,首先了解怎么基于netty開發(fā)一個(gè)通信程序,包括服務(wù)的與客戶端:

Server:

@Slf4j
public class Server {

    private EventLoopGroup boosGroup;

    private EventLoopGroup workGroup;

    public Server(int port){
        try {
            init(port);
            log.info("----- 服務(wù)啟動(dòng)成功 -----");
        } catch (InterruptedException e) {
            log.error("啟動(dòng)服務(wù)出錯(cuò):{}", e.getCause());
        }
    }

    private void init(int port) throws InterruptedException {
        // 處理連接
        this.boosGroup = new NioEventLoopGroup();
        // 處理業(yè)務(wù)
        this.workGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        // 綁定
        bootstrap.group(boosGroup, workGroup)
                .channel(NioServerSocketChannel.class) //配置服務(wù)端
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_RCVBUF, 1024)
                .childOption(ChannelOption.SO_SNDBUF, 1024)
                .childHandler(new ChannelInitializer< SocketChannel >() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.bind(port).sync();
        channelFuture.channel().closeFuture().sync();
    }

    public void close(){
        this.boosGroup.shutdownGracefully();
        this.workGroup.shutdownGracefully();
    }

}

@Slf4j
class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(" >> >> >> > server active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1. 讀取客戶端的數(shù)據(jù)(緩存中去取并打印到控制臺(tái))
        ByteBuf buf = (ByteBuf) msg;
        byte[] request = new byte[buf.readableBytes()];
        buf.readBytes(request);
        String requestBody = new String(request, "utf-8");
        log.info(" >> >> >> >> > receive message: {}", requestBody);

        //2. 返回響應(yīng)數(shù)據(jù)
        ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

Client:

@Slf4j
public class Client {

    private EventLoopGroup workGroup;
    private ChannelFuture channelFuture;

    public Client(int port){
        init(port);
    }

    private void init(int port){
        this.workGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024)
                .option(ChannelOption.SO_SNDBUF, 1024)
                .handler(new ChannelInitializer< SocketChannel >() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
    }

    /**
     *
     * @param message
     */
    public void send(String message){
        this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
    }

    /**
     *
     */
    public void close(){
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        workGroup.shutdownGracefully();
    }
}

@Slf4j
class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(" >> >> >> > client active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);

            String body = new String(req, "utf-8");
            log.info(" >> >> >> >> > receive message: {}", body);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

測試:

public class StarterTests {

    static int port = 9011;

    @Test
    public void startServer(){
        Server server = new Server(9011);
    }

    @Test
    public void startClient(){
        Client client = new Client(port);
        client.send("Hello Netty!");
        while (true){}
    }

}

生態(tài)

  • Dubbo
  • Spring Reactive

類似技術(shù)

Mina、Netty、Grizzly

其他

Proactor非阻塞異步網(wǎng)絡(luò)模型

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • Web
    Web
    +關(guān)注

    關(guān)注

    2

    文章

    1275

    瀏覽量

    70030
  • 框架
    +關(guān)注

    關(guān)注

    0

    文章

    403

    瀏覽量

    17617
  • 容器
    +關(guān)注

    關(guān)注

    0

    文章

    502

    瀏覽量

    22199
  • 模型
    +關(guān)注

    關(guān)注

    1

    文章

    3418

    瀏覽量

    49482
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    [6.4.1]--多路復(fù)用

    多路復(fù)用數(shù)字邏輯
    李開鴻
    發(fā)布于 :2022年11月13日 01:18:45

    多路復(fù)用ICSP引腳如何控制開關(guān)?

    您好,我有一個(gè)應(yīng)用程序,我將使用PIC32MX575F512L?,F(xiàn)在,我將有一個(gè)有限數(shù)量的電線,我可以取出框,將主機(jī)的電子。因此,我需要多路復(fù)用兩條線:通常它們會(huì)產(chǎn)生一個(gè)串行端口(ttl),但是我
    發(fā)表于 04-01 08:19

    AD8183-EVAL是用于視頻路由和多路復(fù)用系統(tǒng)的多路復(fù)用器評(píng)估板

    AD8183-EVAL,用于視頻路由和多路復(fù)用系統(tǒng)的三路2:1模擬多路復(fù)用器評(píng)估板。 AD8183評(píng)估板經(jīng)過精心布局和測試,以展示器件的指定高速性能
    發(fā)表于 06-17 12:40

    如何在Mx1051的FlexCAN1中配置簡單信號(hào)多路復(fù)用和擴(kuò)展信號(hào)多路復(fù)用?

    我們正在研究 FlexCAN1 的 mxrt1051。我們是第一次在 FlexCAN 上工作,請(qǐng)協(xié)助以下幾點(diǎn): - 如何在 Mx1051 的 FlexCAN1 中配置簡單信號(hào)多路復(fù)用和擴(kuò)展信號(hào)
    發(fā)表于 05-05 11:05

    多路復(fù)用與數(shù)字復(fù)接

    多路復(fù)用與數(shù)字復(fù)接8.1 頻分多路復(fù)用(FDM)原理8.2 時(shí)分多路復(fù)用(TDM)原理8.3 準(zhǔn)同步數(shù)字體系(PDH) 8.4 同步數(shù)字體系(SDH)  
    發(fā)表于 10-22 13:26 ?0次下載

    多路復(fù)用技術(shù)

    2.3  多路復(fù)用技術(shù)2.3.1  頻分多路復(fù)用2.3.2  時(shí)分多路復(fù)用2.3.3  波分多路復(fù)用2.3.4  碼分
    發(fā)表于 06-27 21:46 ?0次下載

    基于CPLD的非多路復(fù)用多路復(fù)用總線轉(zhuǎn)換橋的設(shè)計(jì)與實(shí)現(xiàn)

    基于CPLD的非多路復(fù)用多路復(fù)用總線轉(zhuǎn)換橋的設(shè)計(jì)與實(shí)現(xiàn) 微處理器對(duì)外并行總線接口方式一般分為兩種,一種為多路復(fù)用方式,數(shù)據(jù)與地址采用共用引腳,分時(shí)傳輸;另一
    發(fā)表于 03-28 15:08 ?881次閱讀
    基于CPLD的非<b class='flag-5'>多路復(fù)用</b>與<b class='flag-5'>多路復(fù)用</b>總線轉(zhuǎn)換橋的設(shè)計(jì)與實(shí)現(xiàn)

    多路復(fù)用多路復(fù)用總線轉(zhuǎn)換橋的設(shè)計(jì)與實(shí)現(xiàn)

    多路復(fù)用多路復(fù)用總線轉(zhuǎn)換橋的設(shè)計(jì)與實(shí)現(xiàn) 提出了一種新穎的非多路復(fù)用總線與多路復(fù)用總線的轉(zhuǎn)換接口電路。以兩種總線的典型代表芯片TMS
    發(fā)表于 03-28 15:14 ?989次閱讀
    非<b class='flag-5'>多路復(fù)用</b>與<b class='flag-5'>多路復(fù)用</b>總線轉(zhuǎn)換橋的設(shè)計(jì)與實(shí)現(xiàn)

    復(fù)用器的多路復(fù)用

    復(fù)用器的多路復(fù)用  多路復(fù)用
    發(fā)表于 01-07 14:27 ?1209次閱讀

    頻分多路復(fù)用(FDM),頻分多路復(fù)用(FDM)是什么意思

    頻分多路復(fù)用(FDM),頻分多路復(fù)用(FDM)是什么意思 “復(fù)用”是一種將若干個(gè)彼此獨(dú)立的信號(hào),合并為一個(gè)可在同一信道上同時(shí)傳輸?shù)膹?fù)合信號(hào)的方法。
    發(fā)表于 03-19 14:00 ?7642次閱讀

    時(shí)分多路復(fù)用(TDM),時(shí)分多路復(fù)用(TDM)的原理是什么?

    時(shí)分多路復(fù)用(TDM),時(shí)分多路復(fù)用(TDM)的原理是什么?  為了提高信道利用率,使多個(gè)信號(hào)沿同一信道傳輸而互相不干擾,稱
    發(fā)表于 03-19 14:07 ?1.1w次閱讀

    什么是異步時(shí)分多路復(fù)用(ATDM)

    什么是異步時(shí)分多路復(fù)用(ATDM) 異步時(shí)分多路復(fù)用技術(shù) (ATDM,Asynchronism Time-Division Multiplexing)
    發(fā)表于 04-03 15:25 ?1975次閱讀

    時(shí)分多路復(fù)用(TDM),時(shí)分多路復(fù)用(TDM)是什么意思

    時(shí)分多路復(fù)用(TDM),時(shí)分多路復(fù)用(TDM)是什么意思 這種方法是把傳輸信道按時(shí)間來分割,為每個(gè)用戶指定一個(gè)時(shí)間間隔,每個(gè)間隔里傳輸信號(hào)
    發(fā)表于 04-03 15:28 ?6064次閱讀

    如何改進(jìn)開關(guān)/多路復(fù)用器LTspice模型

    如果我的模擬設(shè)計(jì)中包含開關(guān)和多路復(fù)用器,那么還能改進(jìn)開關(guān)/多路復(fù)用器LTspice模型嗎?
    的頭像 發(fā)表于 03-01 13:34 ?3700次閱讀
    如何改進(jìn)開關(guān)/<b class='flag-5'>多路復(fù)用</b>器LTspice<b class='flag-5'>模型</b>

    頻分多路復(fù)用和時(shí)分多路復(fù)用的區(qū)別有哪些

    頻分多路復(fù)用(FDM)和時(shí)分多路復(fù)用(TDM)是兩種主要的多路復(fù)用技術(shù),它們?cè)谕ㄐ畔到y(tǒng)中扮演著至關(guān)重要的角色。
    的頭像 發(fā)表于 05-07 15:24 ?3715次閱讀