來(lái)源:艾逗筆
介紹 MCP 傳輸機(jī)制
MCP 傳輸機(jī)制(Transport)是 MCP 客戶(hù)端與 MCP 服務(wù)器通信的一個(gè)橋梁,定義了客戶(hù)端與服務(wù)器通信的細(xì)節(jié),幫助客戶(hù)端和服務(wù)器交換消息。
MCP 協(xié)議使用 JSON-RPC 來(lái)編碼消息。JSON-RPC 消息必須使用 UTF-8 編碼。
MCP 協(xié)議目前定義了三種傳輸機(jī)制用于客戶(hù)端-服務(wù)器通信:
stdio:通過(guò)標(biāo)準(zhǔn)輸入和標(biāo)準(zhǔn)輸出進(jìn)行通信
SSE:通過(guò) HTTP 進(jìn)行通信,支持流式傳輸。(協(xié)議版本 2024-11-05 開(kāi)始支持,即將廢棄)
Streamble HTTP:通過(guò) HTTP 進(jìn)行通信,支持流式傳輸。(協(xié)議版本 2025-03-26 開(kāi)始支持,用于替代 SSE)
MCP 協(xié)議要求客戶(hù)端應(yīng)盡可能支持 stdio。
MCP 協(xié)議的傳輸機(jī)制是可插拔的,也就是說(shuō),客戶(hù)端和服務(wù)器不局限于 MCP 協(xié)議標(biāo)準(zhǔn)定義的這幾種傳輸機(jī)制,也可以通過(guò)自定義的傳輸機(jī)制來(lái)實(shí)現(xiàn)通信。
stdio 傳輸
stdio 即 standard input & output(標(biāo)準(zhǔn)輸入 / 輸出)。
是 MCP 協(xié)議推薦使用的一種傳輸機(jī)制,主要用于本地進(jìn)程通信。
在 stdio 傳輸中:
客戶(hù)端以子進(jìn)程的形式啟動(dòng) MCP 服務(wù)器。
服務(wù)器從其標(biāo)準(zhǔn)輸入(stdin)讀取 JSON-RPC 消息,并將消息發(fā)送到其標(biāo)準(zhǔn)輸出(stdout)。
消息可能是單個(gè) JSON-RPC 請(qǐng)求、通知、響應(yīng),或者包含多個(gè)請(qǐng)求、通知、響應(yīng)的 JSON-RPC 批處理。
消息由換行符分隔,且不得包含嵌套的換行符。
服務(wù)器可以將其 UTF-8 字符串寫(xiě)入標(biāo)準(zhǔn)錯(cuò)誤(stderr)以進(jìn)行日志記錄??蛻?hù)端可以捕獲、轉(zhuǎn)發(fā)或忽略此日志。
服務(wù)器不得向 stdout 寫(xiě)入無(wú)效的 MCP 消息內(nèi)容。
客戶(hù)端不得向服務(wù)器的 stdin 寫(xiě)入無(wú)效的 MCP 消息內(nèi)容。
stdio 通信流程
客戶(hù)端以子進(jìn)程的方式啟動(dòng)服務(wù)器
客戶(hù)端往服務(wù)器的 stdin 寫(xiě)入消息
服務(wù)器從自身的 stdin 讀取消息
服務(wù)端往自身的 stdout 寫(xiě)入消息
客戶(hù)端從服務(wù)器的 stdout 讀取消息
客戶(hù)端終止子進(jìn)程,關(guān)閉服務(wù)器的 stdin
服務(wù)器關(guān)閉自身的 stdout
stdio 傳輸實(shí)現(xiàn)
參考 MCP 官方的typescript-sdk來(lái)看 stdio 傳輸機(jī)制是如何實(shí)現(xiàn)的:
啟動(dòng) MCP 服務(wù)器
以命令行的方式,在本地啟動(dòng) MCP 服務(wù)器:
npx -y mcp-server-time
創(chuàng)建 stdio 通信管道
MCP 服務(wù)器啟動(dòng)時(shí),會(huì)創(chuàng)建 stdio 通信管道(pipeline),用于跟 MCP 客戶(hù)端進(jìn)行消息通信。在 MCP 客戶(hù)端發(fā)送關(guān)閉信號(hào),或者 MCP 服務(wù)器異常退出之前,這個(gè)通信管道會(huì)一直保持,常駐進(jìn)程。
exportclassStdioServerTransportimplementsTransport { private_readBuffer: ReadBuffer =newReadBuffer(); private_started =false; constructor( private_stdin: Readable = process.stdin, private_stdout: Writable = process.stdout ) {} onclose?:()=>void; onerror?:(error:Error) =>void; onmessage?:(message: JSONRPCMessage) =>void; }
從 stdin 讀取請(qǐng)求消息
MCP 客戶(hù)端把消息發(fā)到通信管道。MCP 服務(wù)器通過(guò)標(biāo)準(zhǔn)輸入 stdin 讀取客戶(hù)端發(fā)送的消息,以換行符: 作為讀取完成標(biāo)識(shí)。
MCP 服務(wù)器讀取到的有效消息,是 JSON-RPC 編碼的結(jié)構(gòu)體。
readMessage(): JSONRPCMessage |null{ if(!this._buffer) { returnnull; } constindex =this._buffer.indexOf(" "); if(index ===-1) { returnnull; } constline =this._buffer.toString("utf8",0, index).replace(/ $/,''); this._buffer =this._buffer.subarray(index +1); returndeserializeMessage(line); }
往 stdout 寫(xiě)入響應(yīng)消息
MCP 服務(wù)器運(yùn)行完內(nèi)部邏輯,需要給 MCP 客戶(hù)端響應(yīng)消息。
MCP 服務(wù)器先用 JSON-RPC 編碼消息,再把消息寫(xiě)入標(biāo)準(zhǔn)輸出 stdout。
send(message: JSONRPCMessage):Promise{ returnnewPromise((resolve) =>{ constjson = serializeMessage(message); if(this._stdout.write(json)) { resolve(); }else{ this._stdout.once("drain", resolve); } }); } }
MCP 客戶(hù)端從 MCP 服務(wù)器的標(biāo)準(zhǔn)輸出 stdout 讀取消息,獲得 MCP 服務(wù)器的響應(yīng)內(nèi)容。
關(guān)閉 stdio 通信管道
MCP 客戶(hù)端退出,給 MCP 服務(wù)器發(fā)送關(guān)閉信號(hào)。
MCP 服務(wù)器通過(guò) stdio 通信管道讀到客戶(hù)端發(fā)送的終止信號(hào),或者內(nèi)部運(yùn)行錯(cuò)誤,主動(dòng)關(guān)閉 stdio 通信管道。
stdio 通信管道關(guān)閉之后,MCP 客戶(hù)端與 MCP 服務(wù)器之間不能再相互發(fā)送消息,直到再次建立 stdio 通信管道。
asyncclose():Promise{ // Remove our event listeners first this._stdin.off("data",this._ondata); this._stdin.off("error",this._onerror); // Check if we were the only data listener constremainingDataListeners =this._stdin.listenerCount('data'); if(remainingDataListeners ===0) { // Only pause stdin if we were the only listener // This prevents interfering with other parts of the application that might be using stdin this._stdin.pause(); } // Clear the buffer and notify closure this._readBuffer.clear(); this.onclose?.(); }
stdio 傳輸?shù)睦?/p>
stdio 傳輸機(jī)制主要依靠本地進(jìn)程通信實(shí)現(xiàn)。
主要的優(yōu)勢(shì)是:
無(wú)外部依賴(lài),實(shí)現(xiàn)簡(jiǎn)單
無(wú)網(wǎng)絡(luò)傳輸,通信速度快
本地通信,安全性高
也有一些局限性:
單進(jìn)程通信,無(wú)法并行處理多個(gè)客戶(hù)端請(qǐng)求
進(jìn)程通信的資源開(kāi)銷(xiāo)大,很難在本地運(yùn)行非常多的服務(wù)
stdio 傳輸?shù)倪m用場(chǎng)景
stdio 傳輸適用于要操作的數(shù)據(jù)資源位于本地計(jì)算機(jī),且不希望暴露外部訪問(wèn)的場(chǎng)景。
比如,你希望通過(guò)一個(gè)聊天客戶(hù)端,來(lái)總結(jié)你的微信消息,微信消息文件存儲(chǔ)在你的本地電腦,外部訪問(wèn)不了,也不應(yīng)該訪問(wèn)。
這種情況,你可以實(shí)現(xiàn)一個(gè) MCP 服務(wù)器來(lái)讀取你電腦上的微信消息文件,通過(guò) stdio 傳輸接收 MCP 客戶(hù)端的訪問(wèn)請(qǐng)求。
如果你要訪問(wèn)的是一個(gè)遠(yuǎn)程服務(wù)器上的文件,也可以使用 stdio 傳輸,流程會(huì)復(fù)雜一些:
先寫(xiě)一個(gè) API 服務(wù),部署在遠(yuǎn)程服務(wù)器,操作遠(yuǎn)程服務(wù)器上的資源,暴露公網(wǎng)訪問(wèn)
寫(xiě)一個(gè) MCP 服務(wù)器,對(duì)接遠(yuǎn)程 API,再通過(guò) stdio 傳輸與客戶(hù)端本地通信
既然 stdio 傳輸訪問(wèn)遠(yuǎn)程資源這么麻煩,是不是應(yīng)該有一種更適合遠(yuǎn)程資源訪問(wèn)的傳輸機(jī)制?
當(dāng)然有。可以使用 SSE 傳輸。
SSE 傳輸
MCP 協(xié)議使用 SSE(Server-Sent Events) 傳輸來(lái)解決遠(yuǎn)程資源訪問(wèn)的問(wèn)題。底層是基于 HTTP 通信,通過(guò)類(lèi)似 API 的方式,讓 MCP 客戶(hù)端直接訪問(wèn)遠(yuǎn)程資源,而不用通過(guò) stdio 傳輸做中轉(zhuǎn)。
在 SSE 傳輸中,服務(wù)器作為一個(gè)獨(dú)立進(jìn)程運(yùn)行,可以處理多個(gè)客戶(hù)端連接。
服務(wù)器必須提供兩個(gè)端點(diǎn):
一個(gè) SSE 端點(diǎn),供客戶(hù)端建立連接并從服務(wù)器接收消息
一個(gè)常規(guī) HTTP POST 端點(diǎn),供客戶(hù)端向服務(wù)器發(fā)送消息
當(dāng)客戶(hù)端連接時(shí),服務(wù)器必須發(fā)送一個(gè)包含客戶(hù)端用于發(fā)送消息的 URL 的端點(diǎn)事件。所有后續(xù)客戶(hù)端消息必須作為 HTTP POST 請(qǐng)求發(fā)送到該端點(diǎn)。
服務(wù)器消息作為 SSE 消息事件發(fā)送,消息內(nèi)容以 JSON 格式編碼在事件數(shù)據(jù)中。
SSE 通信流程
客戶(hù)端向服務(wù)器的 /sse 端點(diǎn)發(fā)送請(qǐng)求(一般是 GET 請(qǐng)求),建立 SSE 連接
服務(wù)器給客戶(hù)端返回一個(gè)包含消息端點(diǎn)地址的事件消息
客戶(hù)端給消息端點(diǎn)發(fā)送消息
服務(wù)器給客戶(hù)端響應(yīng)消息已接收狀態(tài)碼
服務(wù)器給雙方建立的 SSE 連接推送事件消息
客戶(hù)端從 SSE 連接讀取服務(wù)器發(fā)送的事件消息
客戶(hù)端關(guān)閉 SSE 連接
SSE 傳輸實(shí)現(xiàn)
參考 MCP 官方的typescript-sdk來(lái)看 SSE 傳輸機(jī)制是如何實(shí)現(xiàn)的:
啟動(dòng) MCP 服務(wù)器
系統(tǒng)管理員在遠(yuǎn)程服務(wù)器(也可以是本地電腦)輸入命令,啟動(dòng) MCP 服務(wù)器,監(jiān)聽(tīng)服務(wù)器端口,對(duì)外暴露 HTTP 接口。
constserver =newMcpServer({ name:"example-server", version:"1.0.0", }); constapp = express(); app.get("/sse",async(_: Request, res: Response) => { consttransport =newSSEServerTransport("/messages", res); awaitserver.connect(transport); }); app.post("/messages",async(req: Request, res: Response) => { awaittransport.handlePostMessage(req, res); }); app.listen(3001);
在這個(gè)示例中,使用了express框架,啟動(dòng)了一個(gè) HTTP 服務(wù),監(jiān)聽(tīng)在 3001 端口,對(duì)外暴露了兩個(gè)端點(diǎn):
/sse:GET 請(qǐng)求,用于建立 SSE 連接
/messages:POST 請(qǐng)求,用于接收客戶(hù)端發(fā)送的消息
解析一個(gè) MCP 客戶(hù)端可訪問(wèn)的域名到 MCP 服務(wù)器。比如:abc.mcp.so
建立 SSE 連接
MCP 客戶(hù)端請(qǐng)求 MCP 服務(wù)器的 URL 地址:https://abc.mcp.so:3001/sse與 MCP 服務(wù)器建立連接,MCP 服務(wù)器需要給 MCP 客戶(hù)端返回一個(gè)用于消息通信的地址:
res.writeHead(200, { "Content-Type":"text/event-stream", "Cache-Control":"no-cache, no-transform", Connection:"keep-alive", }); constmessagesUrl ="https://abc.mcp.so:3001/messages?sessionId=xxx"; res.write(`event: endpoint data:${messagesUrl} `);
MCP 客戶(hù)端從 MCP 服務(wù)器返回的 endpoint 事件中得到了消息通信的地址,與 MCP 服務(wù)器建立 SSE 連接成功。
MCP 客戶(hù)端通過(guò) POST 請(qǐng)求把消息發(fā)到這個(gè)消息通信地址,與 MCP 服務(wù)器進(jìn)行消息交互。
消息交互
MCP 客戶(hù)端在與 MCP 服務(wù)器建立 SSE 連接之后,開(kāi)始給 MCP 服務(wù)器返回的通信地址發(fā)送消息。
MCP 協(xié)議中的 SSE 傳輸是雙通道響應(yīng)機(jī)制。也就是說(shuō),MCP 服務(wù)器在接收到 MCP 客戶(hù)端的消息之后,既要給當(dāng)前的請(qǐng)求回復(fù)一個(gè)響應(yīng),也要給之前建立的 SSE 連接發(fā)送一條響應(yīng)消息。(通知類(lèi)型的消息,不需要給 SSE 連接發(fā)消息)
舉個(gè)例子,MCP 客戶(hù)端與 MCP 服務(wù)器建立 SSE 連接之后,給 MCP 服務(wù)器發(fā)送的第一條消息,用于初始化階段做能力協(xié)商。
MCP 客戶(hù)端請(qǐng)求示例:
curl -X POST https://abc.mcp.so/messages?sessionId=xxx -H"Content-Type: application/json" -d '{ "jsonrpc":"2.0", "id":"1", "method":"initialize", "params": { "protocolVersion":"1.0", "capabilities": {}, "clientInfo": { "name":"mcp-client", "version":"1.0.0" } } }'
MCP 服務(wù)器從 HTTP 請(qǐng)求體里面讀取 MCP 客戶(hù)端發(fā)送的消息:
asynchandlePostMessage( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown, ):Promise{ if(!this._sseResponse) { constmessage ="SSE connection not established"; res.writeHead(500).end(message); thrownewError(message); } letbody:string| unknown; try{ constct = contentType.parse(req.headers["content-type"] ??""); if(ct.type !=="application/json") { thrownewError(`Unsupported content-type:${ct}`); } body = parsedBody ??awaitgetRawBody(req, { limit: MAXIMUM_MESSAGE_SIZE, encoding: ct.parameters.charset ??"utf-8", }); }catch(error) { res.writeHead(400).end(String(error)); this.onerror?.(errorasError); return; } try{ awaitthis.handleMessage(typeofbody ==='string'?JSON.parse(body) : body); }catch{ res.writeHead(400).end(`Invalid message:${body}`); return; } res.writeHead(202).end("Accepted"); }
先給當(dāng)前請(qǐng)求,響應(yīng)一個(gè) HTTP 202 狀態(tài)碼,告知 MCP 客戶(hù)端,請(qǐng)求已收到。
然后 MCP 服務(wù)器運(yùn)行內(nèi)部邏輯,完成業(yè)務(wù)功能。
再給之前與 MCP 客戶(hù)端建立的 SSE 連接發(fā)送一個(gè)事件消息,data 里面放 JSON-RPC 編碼的消息內(nèi)容:
asyncsend(message: JSONRPCMessage):Promise{ if(!this._sseResponse) { thrownewError("Not connected"); } this._sseResponse.write( `event: message data:${JSON.stringify(message)} `, ); }
MCP 客戶(hù)端根據(jù) MCP 服務(wù)器同步響應(yīng)的 2** 狀態(tài)碼,判斷 MCP 服務(wù)器已經(jīng)接到請(qǐng)求,并開(kāi)始讀取 MCP 服務(wù)器發(fā)到 SSE 連接的消息內(nèi)容。
MCP 客戶(hù)端從與 MCP 服務(wù)器建立的 SSE 連接中讀取event: message事件消息,獲得 MCP 服務(wù)器發(fā)送的業(yè)務(wù)數(shù)據(jù)data。
MCP 客戶(hù)端與 MCP 服務(wù)器建立的 SSE 連接,應(yīng)該是 1:1 的。為了防止串?dāng)?shù)據(jù)的問(wèn)題,在建立 SSE 連接階段,MCP 服務(wù)器返回的通信地址,應(yīng)該為當(dāng)前連接分配一個(gè)唯一標(biāo)識(shí),叫做sessionId,給 MCP 客戶(hù)端返回的通信地址帶上這個(gè)標(biāo)識(shí),比如/messages?sessionId=xxx。
在消息交互階段,MCP 服務(wù)器根據(jù) MCP 客戶(hù)端請(qǐng)求地址參數(shù)里面的 sessionId,找到之前建立的 SSE 連接,并只給這個(gè) SSE 連接發(fā)送消息。
斷開(kāi) SSE 連接
MCP 服務(wù)器與 MCP 客戶(hù)端雙方都可能會(huì)主動(dòng)斷開(kāi) SSE 連接。
還保持連接的一方,應(yīng)該加上必要的連接檢測(cè)和超時(shí)關(guān)閉機(jī)制。
比如通過(guò) SSE 連接,給對(duì)方定時(shí)發(fā)送一條心跳檢測(cè)消息,如果多次無(wú)響應(yīng),可以認(rèn)作對(duì)方已斷開(kāi)連接,此時(shí)可以主動(dòng)關(guān)閉 SSE 連接,避免資源泄露。
一個(gè)用 go 實(shí)現(xiàn)的心跳檢測(cè)和超時(shí)關(guān)閉示例:
// Setup heartbeat ticker heartbeatInterval :=30* time.Second heartbeatTicker := time.NewTicker(heartbeatInterval) deferheartbeatTicker.Stop() // Setup idle timeout idleTimeout :=5* time.Minute idleTimer := time.NewTimer(idleTimeout) deferidleTimer.Stop() gofunc(){ for{ select{ case<-session.Done(): ? ? ??return ? ??case?<-heartbeatTicker.C: ? ? ??// Send heartbeat ? ? ??if?err := writer.SendHeartbeat(); err !=?nil?{ ? ? ? ? session.Close() ? ? ? ??return ? ? ? } ? ??case?<-idleTimer.C: ? ? ??// Close connection due to inactivity ? ? ? session.Close() ? ? ??return ? ? } ? } }()
SSE 安全防護(hù)
當(dāng)使用 SSE 傳輸時(shí),服務(wù)器一方需要實(shí)現(xiàn)一些必要的安全防護(hù)措施:
服務(wù)器必須驗(yàn)證所有傳入連接的 Origin 頭,以防止 DNS 重綁定攻擊
在本地運(yùn)行時(shí),服務(wù)器應(yīng)僅綁定到 localhost(127.0.0.1),而不是所有網(wǎng)絡(luò)接口(0.0.0.0)
服務(wù)器應(yīng)對(duì)所有連接實(shí)施適當(dāng)?shù)纳矸蒡?yàn)證
如果沒(méi)有這些保護(hù)措施,攻擊者可能會(huì)使用 DNS 重綁定從遠(yuǎn)程網(wǎng)站與本地 MCP 服務(wù)器交互。
SSE 傳輸?shù)倪m用場(chǎng)景
SSE 傳輸適用于 MCP 客戶(hù)端與 MCP 服務(wù)器不在同一個(gè)網(wǎng)絡(luò)下的通信場(chǎng)景。
比如,你希望在本地電腦,通過(guò)對(duì)話的方式,查詢(xún)你云服務(wù)器上的數(shù)據(jù)庫(kù)。你就可以在你的云服務(wù)器上部署一個(gè) MCP 服務(wù)器,去讀取數(shù)據(jù)庫(kù),再跟你本地電腦上的 MCP 客戶(hù)端建立連接通信。
當(dāng)然,所有用 SSE 傳輸實(shí)現(xiàn)的 MCP 服務(wù)器,理論上都可以通過(guò) stdio 傳輸 + API 的方式實(shí)現(xiàn)。
區(qū)別在于:
用 SSE 傳輸,MCP 客戶(hù)端直接與 MCP 服務(wù)器通信,而不用通過(guò)本地的 stdio 傳輸調(diào)用 API 進(jìn)行中轉(zhuǎn)。
用 SSE 傳輸,在 MCP 客戶(hù)端只需要一個(gè) URL 即可接入,對(duì)本地環(huán)境無(wú)要求,也無(wú)需在本地運(yùn)行 MCP 服務(wù)器,用戶(hù)側(cè)的使用門(mén)檻更低。
SSE 傳輸?shù)睦?/p>
SSE 傳輸主要解決遠(yuǎn)程資源訪問(wèn)的問(wèn)題,依靠 HTTP 協(xié)議實(shí)現(xiàn)底層通信。
SSE 傳輸?shù)闹饕獌?yōu)勢(shì):
支持遠(yuǎn)程資源訪問(wèn),讓 MCP 客戶(hù)端可以直接訪問(wèn)遠(yuǎn)程服務(wù),解決了 stdio 傳輸僅適用于本地資源的局限
基于標(biāo)準(zhǔn) HTTP 協(xié)議實(shí)現(xiàn),兼容性好,便于與現(xiàn)有 Web 基礎(chǔ)設(shè)施集成
服務(wù)器可作為獨(dú)立進(jìn)程運(yùn)行,支持處理多個(gè)客戶(hù)端連接
相比 WebSocket 實(shí)現(xiàn)簡(jiǎn)單,是普通 HTTP 的擴(kuò)展,不需要協(xié)議升級(jí)
SSE 傳輸?shù)闹饕觿?shì)與問(wèn)題:
連接不穩(wěn)定:在無(wú)服務(wù)器(serverless)環(huán)境中,SSE 連接會(huì)隨機(jī)、頻繁斷開(kāi),影響 AI 代理需要的可靠持久連接
擴(kuò)展性挑戰(zhàn):SSE 不是為云原生架構(gòu)設(shè)計(jì)的,在擴(kuò)展平臺(tái)時(shí)會(huì)遇到瓶頸
瀏覽器連接限制:每個(gè)瀏覽器和域名的最大打開(kāi)連接數(shù)很低(6 個(gè)),當(dāng)用戶(hù)打開(kāi)多個(gè)標(biāo)簽頁(yè)時(shí)會(huì)出現(xiàn)問(wèn)題
代理和防火墻問(wèn)題:某些代理和防火墻會(huì)因?yàn)槿鄙?Content-Length 頭而阻止 SSE 連接,在企業(yè)環(huán)境部署時(shí)造成挑戰(zhàn)
復(fù)雜的雙通道響應(yīng)機(jī)制:MCP 中的 SSE 實(shí)現(xiàn)要求服務(wù)器在接收客戶(hù)端消息后,既要給當(dāng)前請(qǐng)求響應(yīng),也要給之前建立的 SSE 連接發(fā)送響應(yīng)消息
無(wú)法支持長(zhǎng)期的無(wú)服務(wù)器部署:無(wú)服務(wù)器架構(gòu)通常自動(dòng)擴(kuò)縮容,不適合長(zhǎng)時(shí)間連接,而 SSE 需要維持持久連接
需要大量會(huì)話管理:需要為每個(gè) SSE 連接分配唯一標(biāo)識(shí)(sessionId)來(lái)防止數(shù)據(jù)混淆,增加了實(shí)現(xiàn)復(fù)雜度
需要額外的連接檢測(cè)和超時(shí)關(guān)閉機(jī)制:需要實(shí)現(xiàn)心跳檢測(cè)和超時(shí)機(jī)制來(lái)避免資源泄露
正因?yàn)檫@些問(wèn)題,MCP 協(xié)議已經(jīng)引入了新的 Streamable HTTP 傳輸機(jī)制(2025-03-26 版本)來(lái)替代 SSE,并計(jì)劃廢棄 SSE 傳輸。新的傳輸機(jī)制保留了 HTTP 的基礎(chǔ),但支持更靈活的連接方式,更適合現(xiàn)代云架構(gòu)和無(wú)服務(wù)器環(huán)境。
Streamable HTTP 傳輸
Streamable HTTP 傳輸是 MCP 協(xié)議在 2025-03-26 版本中引入的新傳輸機(jī)制,用于替代之前的 SSE 傳輸。
在 Streamable HTTP 傳輸中,服務(wù)器作為一個(gè)獨(dú)立進(jìn)程運(yùn)行,可以處理多個(gè)客戶(hù)端連接。此傳輸使用 HTTP POST 和 GET 請(qǐng)求,服務(wù)器可以選擇使用服務(wù)器發(fā)送事件(SSE)來(lái)流式傳輸多個(gè)服務(wù)器消息。
服務(wù)器必須提供一個(gè)同時(shí)支持 POST 和 GET 方法的單個(gè) HTTP 端點(diǎn)。例如:https://xyz.mcp.so/mcp。
Streamable HTTP 通信流程
客戶(hù)端給服務(wù)器的通信端點(diǎn)發(fā)消息
服務(wù)器給客戶(hù)端響應(yīng)消息
客戶(hù)端根據(jù)服務(wù)器的響應(yīng)類(lèi)型,繼續(xù)給服務(wù)器發(fā)消息
服務(wù)器繼續(xù)響應(yīng)客戶(hù)端消息
跟 SSE 傳輸不同的點(diǎn)在于,Streamable HTTP 傳輸中,客戶(hù)端與服務(wù)器的消息交互,基本上是“一來(lái)一回”的(單通道響應(yīng))。而這個(gè)“一來(lái)一回”的消息交互,可能會(huì)有很多種組合類(lèi)型。
客戶(hù)端發(fā)送 GET 請(qǐng)求給服務(wù)器,服務(wù)器返回 SSE 連接
客戶(hù)端 POST JSON-RPC 編碼的消息給服務(wù)器,服務(wù)器返回 JSON-RPC 編碼的消息響應(yīng)
客戶(hù)端 POST JSON-RPC 編碼的消息給服務(wù)器,服務(wù)器返回一個(gè) SSE 連接
客戶(hù)端給 SSE 連接發(fā)消息,服務(wù)器收到后給 SSE 連接響應(yīng)消息
服務(wù)器響應(yīng)的消息,可能包含狀態(tài)標(biāo)識(shí):Mcp-Session-Id
客戶(hù)端發(fā)消息時(shí)候需要帶上狀態(tài)標(biāo)識(shí):Mcp-Session-Id
Streamable HTTP 傳輸實(shí)現(xiàn)
參考 MCP 官方的typescript-sdk來(lái)看 Streamable HTTP 傳輸機(jī)制是如何實(shí)現(xiàn)的:
啟動(dòng)服務(wù)器
跟 SSE 傳輸機(jī)制一樣,Streamable HTTP 傳輸本質(zhì)上也是基于 HTTP 協(xié)議通信,需要先啟動(dòng)一個(gè) HTTP 服務(wù):
constserver =newMcpServer({ name:"example-server", version:"1.0.0", }); constapp = express(); app.all("/mcp",async(req: Request, res: Response) => { consttransport =newStreamableHTTPServerTransport(); awaitserver.connect(transport); awaittransport.handleMessage(req, res); }); app.listen(3002);
跟 SSE 傳輸不一樣的點(diǎn)在于,Streamable HTTP 傳輸只需要暴露一個(gè)端點(diǎn)(endpoint),來(lái)接收各種類(lèi)型的客戶(hù)端請(qǐng)求(GET / POST / DELETE)。
比如在這個(gè)例子,使用express框架,啟動(dòng)了一個(gè) HTTP 服務(wù),監(jiān)聽(tīng)在 3002 端口,對(duì)外暴露了一個(gè)端點(diǎn):
/mcp:接收客戶(hù)端建立連接、交換消息的請(qǐng)求
解析一個(gè) MCP 客戶(hù)端可訪問(wèn)的域名到 MCP 服務(wù)器。比如:xyz.mcp.so
消息交互
在 MCP 服務(wù)器啟動(dòng)成功之后,MCP 客戶(hù)端可以直接給 MCP 服務(wù)器暴露的地址發(fā)送消息。
跟 SSE 傳輸不同,Streamable HTTP 傳輸機(jī)制中,MCP 客戶(hù)端給服務(wù)器發(fā)送消息,無(wú)需先跟一個(gè)端點(diǎn)建立 SSE 連接,再給另一個(gè)端點(diǎn)發(fā)消息。而是單通道模式,即客戶(hù)端給服務(wù)器發(fā)消息,直接獲得服務(wù)器的響應(yīng)內(nèi)容。
MCP 客戶(hù)端可以使用 GET 或者 POST 請(qǐng)求給服務(wù)器發(fā)消息,每個(gè)請(qǐng)求必須設(shè)置請(qǐng)求頭Accept,傳遞以下兩個(gè)值:
application/json 接收服務(wù)器響應(yīng)的 JSON-RPC 編碼消息
text/event-stream 由服務(wù)器開(kāi)啟流式傳輸通道,客戶(hù)端從這個(gè)流里面讀取事件消息
MCP 客戶(hù)端請(qǐng)求示例:
curl -X POST https://xyz.mcp.so/mcp -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" -d '{ "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": { "protocolVersion": "1.0", "capabilities": {}, "clientInfo": { "name": "mcp-client", "version": "1.0.0" } } }'
Streamable HTTP 傳輸機(jī)制下,MCP 客戶(hù)端與服務(wù)器通信的幾個(gè)要點(diǎn):
客戶(hù)端可以給服務(wù)器發(fā)送不包含請(qǐng)求體的 GET 請(qǐng)求,用于建立 SSE 連接,讓服務(wù)器可以主動(dòng)給客戶(hù)端先發(fā)消息
客戶(hù)端給服務(wù)器發(fā)送 JSON-RPC 消息的情況,必須使用 POST 請(qǐng)求
服務(wù)器接到客戶(hù)端的 GET 請(qǐng)求時(shí),要么返回Contet-Type: text/event-stream開(kāi)啟 SSE 連接,要么返回 HTTP 405 狀態(tài)碼,表示不支持 SSE 連接。
服務(wù)器接到客戶(hù)端的 POST 請(qǐng)求時(shí),從請(qǐng)求體里面讀取 JSON-RPC 消息,如果是通知消息,就響應(yīng) HTTP 202 狀態(tài)碼,表示消息已收到。如果是非通知消息,服務(wù)器可以選擇返回Content-Type: text/event-stream開(kāi)啟 SSE 傳輸,或者返回Content-Type: application/json同步響應(yīng)一條 JSON-RPC 消息。
會(huì)話保持
Streamable HTTP 傳輸既支持無(wú)狀態(tài)的請(qǐng)求:每一次請(qǐng)求都是獨(dú)立的,無(wú)需記錄狀態(tài)。
也支持有狀態(tài)的請(qǐng)求:一次新的請(qǐng)求,可能需要同步之前的請(qǐng)求 / 響應(yīng)信息作為參考。這種情況叫做:會(huì)話保持。
如果需要保持會(huì)話,MCP 服務(wù)器與 MCP 客戶(hù)端之間的交互應(yīng)該遵守以下原則:
使用 Streamable HTTP 傳輸?shù)姆?wù)器可以在初始化時(shí)分配一個(gè)會(huì)話 ID,方法是在包含InitializeResult的 HTTP 響應(yīng)中包含它,放在Mcp-Session-Id頭中。
如果服務(wù)器在初始化期間返回了Mcp-Session-Id,使用 Streamable HTTP 傳輸?shù)目蛻?hù)端必須在所有后續(xù)的 HTTP 請(qǐng)求中在Mcp-Session-Id頭中包含它。
服務(wù)器可以隨時(shí)終止會(huì)話,之后它必須使用 HTTP 404 Not Found 響應(yīng)包含該會(huì)話 ID 的請(qǐng)求。
當(dāng)客戶(hù)端收到對(duì)包含Mcp-Session-Id的請(qǐng)求的 HTTP 404 響應(yīng)時(shí),它必須通過(guò)發(fā)送一個(gè)不帶會(huì)話 ID 的新InitializeRequest來(lái)啟動(dòng)一個(gè)新會(huì)話。
不再需要特定會(huì)話的客戶(hù)端應(yīng)該發(fā)送一個(gè)帶有Mcp-Session-Id頭的 HTTP DELETE 到 MCP 端點(diǎn),以顯式終止會(huì)話。
MCP 服務(wù)器驗(yàn)證會(huì)話的一個(gè)示例:
/** * Validates session ID for non-initialization requests * Returns true if the session is valid, false otherwise */ privatevalidateSession(req: IncomingMessage, res: ServerResponse):boolean{ if(!this._initialized) { // If the server has not been initialized yet, reject all requests res.writeHead(400).end(JSON.stringify({ jsonrpc:"2.0", error: { code:-32000, message:"Bad Request: Server not initialized" }, id:null })); returnfalse; } if(this.sessionId ===undefined) { // If the session ID is not set, the session management is disabled // and we don't need to validate the session ID returntrue; } constsessionId = req.headers["mcp-session-id"]; if(!sessionId) { // Non-initialization requests without a session ID should return 400 Bad Request res.writeHead(400).end(JSON.stringify({ jsonrpc:"2.0", error: { code:-32000, message:"Bad Request: Mcp-Session-Id header is required" }, id:null })); returnfalse; }elseif(Array.isArray(sessionId)) { res.writeHead(400).end(JSON.stringify({ jsonrpc:"2.0", error: { code:-32000, message:"Bad Request: Mcp-Session-Id header must be a single value" }, id:null })); returnfalse; } elseif(sessionId !==this.sessionId) { // Reject requests with invalid session ID with 404 Not Found res.writeHead(404).end(JSON.stringify({ jsonrpc:"2.0", error: { code:-32001, message:"Session not found" }, id:null })); returnfalse; } returntrue; }
連接斷開(kāi)與重連
Streamable HTTP 傳輸,如果客戶(hù)端與服務(wù)器使用 SSE 連接通信,斷開(kāi)連接的方式跟 SSE 傳輸斷開(kāi)連接的方式一致。
可以由連接的任意一方主動(dòng)斷開(kāi)連接。還保持著連接的一方,需要實(shí)現(xiàn)心跳檢測(cè)和超時(shí)機(jī)制,以便能及時(shí)關(guān)閉連接,避免資源泄露。
Streamable HTTP 傳輸比起 SSE 傳輸,做了一些改進(jìn),支持恢復(fù)已中斷的連接,重新發(fā)送可能丟失的消息:
服務(wù)器可以在其 SSE 事件中附加一個(gè) ID 字段。如果存在,ID 必須在所有會(huì)話所有流中全局唯一。
如果客戶(hù)端希望在斷開(kāi)連接后恢復(fù),它應(yīng)該向服務(wù)器發(fā)出 HTTP GET 請(qǐng)求,并包含 Last-Event-ID 頭,告知服務(wù)器它接收到的最后一個(gè)事件 ID。服務(wù)器可以重放在最后一個(gè)事件 ID 之后將發(fā)送的消息,并從該點(diǎn)恢復(fù)流。
支持?jǐn)帱c(diǎn)重連的 Streamable HTTP 傳輸,在消息傳輸方面會(huì)比 SSE 傳輸更加可靠。
Streamable HTTP 傳輸?shù)睦?/p>
Streamable HTTP 傳輸機(jī)制結(jié)合了 SSE 傳輸?shù)倪h(yuǎn)程訪問(wèn)能力和無(wú)狀態(tài) HTTP 的靈活性,同時(shí)解決了 SSE 傳輸中的許多問(wèn)題。
主要優(yōu)勢(shì):
兼容無(wú)服務(wù)器環(huán)境,可以在短連接模式下工作
靈活的連接模式,支持簡(jiǎn)單的請(qǐng)求-響應(yīng)和流式傳輸
會(huì)話管理更加標(biāo)準(zhǔn)化和清晰
支持?jǐn)嚅_(kāi)連接恢復(fù)和消息重傳
保留了 SSE 的流式傳輸能力,同時(shí)解決了其穩(wěn)定性問(wèn)題
向后兼容,可以支持舊版客戶(hù)端和服務(wù)器
主要劣勢(shì):
相比單純的 stdio 傳輸實(shí)現(xiàn)復(fù)雜度更高
仍需處理網(wǎng)絡(luò)連接斷開(kāi)和恢復(fù)的邏輯
會(huì)話管理需要服務(wù)器引入額外的組件(比如用 Redis 來(lái)存儲(chǔ) Session)
Streamable HTTP 傳輸?shù)倪m用場(chǎng)景
Streamable HTTP 傳輸適用于:
需要遠(yuǎn)程訪問(wèn)服務(wù)的場(chǎng)景,特別是云環(huán)境和無(wú)服務(wù)器架構(gòu)
需要支持流式輸出的 AI 服務(wù)
需要服務(wù)器主動(dòng)推送消息給客戶(hù)端的場(chǎng)景
大規(guī)模部署需要高可靠性和可擴(kuò)展性的服務(wù)
需要在不穩(wěn)定網(wǎng)絡(luò)環(huán)境中保持可靠通信的場(chǎng)景
與 SSE 傳輸相比,Streamable HTTP 傳輸是一個(gè)更全面、更靈活的解決方案,特別適合現(xiàn)代云原生應(yīng)用和無(wú)服務(wù)器環(huán)境。
自定義傳輸
MCP 客戶(hù)端和 MCP 服務(wù)器可以實(shí)現(xiàn)額外的自定義傳輸機(jī)制以滿(mǎn)足其特定需求。MCP 協(xié)議與傳輸無(wú)關(guān),可以在支持雙向消息交換的任何通信通道上實(shí)現(xiàn)。
選擇支持自定義傳輸?shù)膶?shí)現(xiàn)者必須確保他們保留由 MCP 定義的 JSON-RPC 消息格式和生命周期要求。自定義傳輸應(yīng)記錄其特定的連接建立和消息交換模式,以實(shí)現(xiàn)互操作性。
用一個(gè)實(shí)際的例子來(lái)說(shuō)明,如何實(shí)現(xiàn)一個(gè)自定義傳輸,來(lái)滿(mǎn)足特定的需求。
需求分析
目前市面上大部分的 MCP 服務(wù)器是使用 stdio 和 SSE 傳輸機(jī)制實(shí)現(xiàn)的,用戶(hù)要使用這些 MCP 服務(wù)器,需要拉代碼到本地運(yùn)行,使用門(mén)檻有點(diǎn)高。
我們希望實(shí)現(xiàn)一個(gè) MCP 代理服務(wù),在云上部署,讓用戶(hù)僅需要配置一個(gè) URL 即可接入。由這個(gè)云端部署的 MCP 代理去對(duì)接第三方的 MCP 服務(wù)器。
大致的流程是:
這個(gè) MCP 代理服務(wù)以 HTTP 的形式提供接入,需要支持并發(fā)調(diào)用。這個(gè) MCP 代理服務(wù)作為 stdio 傳輸或者 SSE 傳輸?shù)目蛻?hù)端,發(fā)送消息給后臺(tái)的 MCP 服務(wù)器。
按照這個(gè)方案,MCP 代理服務(wù)跟后臺(tái)部署的第三方服務(wù)器之間的交互存在著一些問(wèn)題:
如果后臺(tái)對(duì)接的 MCP 服務(wù)器是基于 stdio 傳輸實(shí)現(xiàn)的,MCP 代理每接到一個(gè)用戶(hù)請(qǐng)求,都需要調(diào)用 MCP 服務(wù)器創(chuàng)建一個(gè)獨(dú)立的 stdio 通信進(jìn)程。服務(wù)器開(kāi)銷(xiāo)特別大(進(jìn)程創(chuàng)建和銷(xiāo)毀、上下文切換,內(nèi)存隔離等操作,都很消耗服務(wù)器資源)。
如果后臺(tái)對(duì)接的 MCP 服務(wù)器是基于 SSE 傳輸實(shí)現(xiàn)的,從前面對(duì) SSE 傳輸?shù)姆治鑫覀冎溃琈CP 代理需要跟后臺(tái)部署的 MCP 服務(wù)器之間保持一個(gè) SSE 連接。如果每個(gè)用戶(hù)請(qǐng)求都需要維持一個(gè) SSE 的長(zhǎng)連接,對(duì)服務(wù)器也是不小的壓力。
為了解決這些問(wèn)題,對(duì)于 MCP 代理和其背后連接的 MCP 服務(wù)器,可以想到的一些優(yōu)化方案:
用 k8s 集群分布式部署,代替單機(jī)部署,保持 MCP 代理的彈性擴(kuò)容能力,支持更高的并發(fā)請(qǐng)求
修改第三方服務(wù)器的傳輸機(jī)制,如果是 stdio 進(jìn)程通信,可以改成輕量級(jí)的協(xié)程通信,或者使用 HTTP 通信,支持多路復(fù)用
如果第三方服務(wù)器的傳輸機(jī)制是 SSE,需要把雙通道響應(yīng)改成單通道響應(yīng),并且不應(yīng)該使用長(zhǎng)連接
分布式網(wǎng)絡(luò)下,會(huì)話保持需要引入一些額外的組件(比如 redis)或者額外的策略(比如用負(fù)載均衡的 IP Hash 策略把請(qǐng)求路由到固定的機(jī)器),如果 MCP 服務(wù)器不是必須要用到會(huì)話保持,那就最好改成無(wú)狀態(tài)的傳輸
基于以上幾點(diǎn)分析,我們可以設(shè)計(jì)一套自定義的傳輸機(jī)制,來(lái)改造在分布式集群部署的第三方 MCP 服務(wù)器。
這套自定義的傳輸機(jī)制應(yīng)該具備的幾個(gè)特性:
基于 HTTP 通信
非流式傳輸
無(wú)狀態(tài),無(wú)會(huì)話保持
短連接
雖然 Streamable HTTP 傳輸通過(guò)配置參數(shù)也能實(shí)現(xiàn)這些功能,但是我們還是希望能自定義一套更簡(jiǎn)單,更直接,零配置的傳輸機(jī)制。
我們把這個(gè)新的傳輸機(jī)制取名為:Restful HTTP Transport,簡(jiǎn)稱(chēng) Rest Transport
因?yàn)橐脑斓闹饕?MCP 服務(wù)器,接下來(lái)主要講解如何實(shí)現(xiàn)一個(gè)服務(wù)器使用的 Rest Server Transport.
實(shí)現(xiàn) Rest Server Transport
參考 MCP 官方實(shí)現(xiàn)的 Streamable HTTP Transport,我們來(lái)實(shí)現(xiàn)這個(gè)自定義的 Rest Server Transport。
定義新的傳輸類(lèi),實(shí)現(xiàn) MCP 協(xié)議的 Transport 接口
/** * Server transport for Synchronous HTTP: a stateless implementation for direct HTTP responses. * It supports concurrent requests with no streaming, no SSE, and no persistent connections. */ exportclassRestServerTransportimplementsTransport { // ... }
定義啟動(dòng)服務(wù)方法,用來(lái)啟動(dòng)一個(gè) HTTP 服務(wù)器,處理客戶(hù)端的請(qǐng)求
/** * Start the HTTP server */ asyncstartServer():Promise{ if(this._server) { thrownewError("Server is already running"); } this._server = express(); this._server.post(this._endpoint,(req, res) =>{ this.handleRequest(req, res, req.body); }); returnnewPromise((resolve, reject) =>{ try{ this._httpServer =this._server!.listen(this._port,()=>{ console.log( `Server is running on http://localhost:${this._port}${this._endpoint}` ); resolve(); }); this._httpServer.on("error",(error) =>{ console.error("Server error:", error); this.onerror?.(error); }); }catch(error) { reject(error); } }); }
跟 Streamable HTTP 傳輸一樣,自定義的傳輸在啟動(dòng) HTTP 服務(wù)器之后,我們也只暴露一個(gè)端點(diǎn)來(lái)接收客戶(hù)端請(qǐng)求,這個(gè)端點(diǎn)和 HTTP 服務(wù)器監(jiān)聽(tīng)的端口,都可以在啟動(dòng)服務(wù)器的時(shí)候自定義。
exportinterfaceRestServerTransportOptions { endpoint?:string; port?:string|number; }
接收客戶(hù)端請(qǐng)求
跟 Streamable HTTP 傳輸最主要的區(qū)別,我們自定義的這個(gè)傳輸,僅支持客戶(hù)端發(fā)起 POST 請(qǐng)求,客戶(hù)端也只會(huì)收到application/json響應(yīng),不會(huì)收到text/event-stream響應(yīng)。
客戶(hù)端與服務(wù)器使用短連接通信,不會(huì)有Connection: "keep-alive"。服務(wù)器響應(yīng)完,與客戶(hù)端的連接就會(huì)斷開(kāi)。
客戶(hù)端與服務(wù)器之間的消息交互是無(wú)狀態(tài)的,所以客戶(hù)端無(wú)需傳遞Mcp-Session-Id,服務(wù)器也不會(huì)判斷客戶(hù)端的會(huì)話有效性,不會(huì)維護(hù)任何 session 相關(guān)的數(shù)據(jù)。
Rest Server Transport 處理用戶(hù)請(qǐng)求的主要實(shí)現(xiàn)邏輯:
/** * Handles an incoming HTTP request */ asynchandleRequest( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown ):Promise{ if(req.method ==="POST") { awaitthis.handlePostRequest(req, res, parsedBody); }else{ res.writeHead(405).end( JSON.stringify({ jsonrpc:"2.0", error: { code:-32000, message:"Method not allowed", }, id:null, }) ); } } /** * Handles POST requests containing JSON-RPC messages */ privateasynchandlePostRequest( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown ):Promise { try{ // validate the Accept header constacceptHeader = req.headers.accept; if( acceptHeader && acceptHeader !=="*/*"&& !acceptHeader.includes("application/json") ) { res.writeHead(406).end( JSON.stringify({ jsonrpc:"2.0", error: { code:-32000, message:"Not Acceptable: Client must accept application/json", }, id:null, }) ); return; } constct = req.headers["content-type"]; if(!ct || !ct.includes("application/json")) { res.writeHead(415).end( JSON.stringify({ jsonrpc:"2.0", error: { code:-32000, message: "Unsupported Media Type: Content-Type must be application/json", }, id:null, }) ); return; } letrawMessage; if(parsedBody !==undefined) { rawMessage = parsedBody; }else{ constparsedCt = contentType.parse(ct); constbody =awaitgetRawBody(req, { limit: MAXIMUM_MESSAGE_SIZE, encoding: parsedCt.parameters.charset ??"utf-8", }); rawMessage =JSON.parse(body.toString()); } letmessages: JSONRPCMessage[]; // handle batch and single messages if(Array.isArray(rawMessage)) { messages = rawMessage.map((msg) =>JSONRPCMessageSchema.parse(msg)); }else{ messages = [JSONRPCMessageSchema.parse(rawMessage)]; } // check if it contains requests consthasRequests = messages.some( (msg) =>"method"inmsg &&"id"inmsg ); consthasOnlyNotifications = messages.every( (msg) =>"method"inmsg && !("id"inmsg) ); if(hasOnlyNotifications) { // if it only contains notifications, return 202 res.writeHead(202).end(); // handle each message for(constmessage of messages) { this.onmessage?.(message); } }elseif(hasRequests) { // Create a unique identifier for this request batch constrequestBatchId = randomUUID(); // Extract the request IDs that we need to collect responses for constrequestIds = messages .filter((msg) =>"method"inmsg &&"id"inmsg) .map((msg) =>String(msg.id)); // Set up a promise that will be resolved with all the responses constresponsePromise =newPromise ((resolve) => { this._pendingRequests.set(requestBatchId, { resolve, responseMessages: [], requestIds, }); }); //Processallmessages for(constmessage of messages) { this.onmessage?.(message); } //Waitforresponsesandsendthem constresponses=awaitPromise.race([ responsePromise, // 30 second timeout newPromise ((resolve) => setTimeout(() => resolve([]), 30000) ), ]); //Cleanupthependingrequest this._pendingRequests.delete(requestBatchId); //Setresponseheaders constheaders:Record = { "Content-Type": "application/json", }; res.writeHead(200, headers); //FormattheresponseaccordingtoJSON-RPCspec constresponseBody=responses.length=== 1 ?responses[0] :responses; res.end(JSON.stringify(responseBody)); } }catch(error) { //returnJSON-RPCformattederror res.writeHead(400).end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32700, message: "Parse error", data:String(error), }, id:null, }) ); this.onerror?.(errorasError); } }
服務(wù)器發(fā)送消息
MCP 服務(wù)器接到客戶(hù)端請(qǐng)求后,把請(qǐng)求數(shù)據(jù)發(fā)到內(nèi)部實(shí)現(xiàn)的各個(gè)功能函數(shù)內(nèi),得到響應(yīng)內(nèi)容后,用 JSON-RPC 編碼,響應(yīng)給 MCP 客戶(hù)端。
Rest Server Transport 發(fā)送消息的主要實(shí)現(xiàn)邏輯:
asyncsend(message: JSONRPCMessage):Promise{ // Only process response messages if(!("id"inmessage) || !("result"inmessage ||"error"inmessage)) { return; } constmessageId =String(message.id); // Find the pending request that is waiting for this response for(const[batchId, pendingRequest] ofthis._pendingRequests.entries()) { if(pendingRequest.requestIds.includes(messageId)) { // Add this response to the collection pendingRequest.responseMessages.push(message); // If we've collected all responses for this batch, resolve the promise if( pendingRequest.responseMessages.length === pendingRequest.requestIds.length ) { pendingRequest.resolve(pendingRequest.responseMessages); } break; } } }
關(guān)閉服務(wù)
Streamable HTTP / SSE 以及我們自定義的 Rest Transport,關(guān)閉服務(wù)的邏輯都是一致的。
因?yàn)榉?wù)啟動(dòng)時(shí)是啟動(dòng)了一個(gè) HTTP 服務(wù)器,所以服務(wù)關(guān)閉時(shí),只需要關(guān)閉這個(gè) HTTP 服務(wù)器即可。
Rest Server Transport 實(shí)現(xiàn)的關(guān)閉服務(wù)的主要邏輯:
asyncstopServer():Promise{ if(this._httpServer) { returnnewPromise((resolve, reject) =>{ this._httpServer!.close((err) =>{ if(err) { reject(err); return; } this._server =null; this._httpServer =null; resolve(); }); }); } }
在實(shí)現(xiàn)完這個(gè)自定義的傳輸后,我們就可以把代碼打包,發(fā)布到 npm 公共倉(cāng)庫(kù)。這樣第三方 MCP 服務(wù)器都可以使用這個(gè) Transport 來(lái)改造自己的實(shí)現(xiàn)。
比如,我用 typescript 實(shí)現(xiàn)的Rest Server Transport封裝在@chatmcp/sdk這個(gè) npm 包里面,第三方服務(wù)器可以這樣引入:
import{ RestServerTransport }from"@chatmcp/sdk/server/index.js";
改造 MCP 服務(wù)器
為了把第三方 MCP 服務(wù)器部署到云端,支持多租戶(hù)并發(fā)調(diào)用,我們需要對(duì)第三方 MCP 服務(wù)器做一定的改造。
以Perplexity Ask Server這個(gè) MCP 服務(wù)器為例,來(lái)演示具體的改造流程:
安裝包含 Rest Server Transport 的 SDK
npm install @chatmcp/sdk
獲取服務(wù)啟動(dòng)參數(shù)
通過(guò)@chatmcp/sdk提供的getParamValue方法,可以獲得 MCP 服務(wù)器啟動(dòng)時(shí)候的參數(shù),從命令行讀取,或者從環(huán)境變量讀?。?/p>
import { getParamValue } from "@chatmcp/sdk/utils/index.js"; const perplexityApiKey = getParamValue("perplexity_api_key") || ""; const mode = getParamValue("mode") || "stdio"; const port = getParamValue("port") || 9593; const endpoint = getParamValue("endpoint") || "/rest";
獲取請(qǐng)求參數(shù)
MCP 服務(wù)器的設(shè)計(jì)初衷,是讓用戶(hù)運(yùn)行在本地,跟私有數(shù)據(jù)打交道。
所以目前絕大部分的 MCP 服務(wù)器,都是不支持多租戶(hù)的(也就是不能讓多個(gè)用戶(hù)共同使用)
主要的限制在于,在 MCP 服務(wù)器內(nèi)部的實(shí)現(xiàn)邏輯里面,用到鑒權(quán)參數(shù)的地方,是在服務(wù)器啟動(dòng)時(shí)獲取的,不是在請(qǐng)求時(shí)動(dòng)態(tài)獲取的。
如果要在云端部署 MCP 服務(wù)器,支持多租戶(hù)使用,我們需要修改 MCP 服務(wù)器內(nèi)部的參數(shù)獲取邏輯,改成從請(qǐng)求參數(shù)里面動(dòng)態(tài)獲取。
MCP 協(xié)議允許在每個(gè)請(qǐng)求的 Request 對(duì)象通過(guò) _meta 字段傳遞自定義的數(shù)據(jù)。
那么就可以改造 MCP 服務(wù)器,從 request.params._meta 里面獲取客戶(hù)端傳遞的 auth 參數(shù)。
在@chatmcp/sdk中,實(shí)現(xiàn)了一個(gè)getAuthValue方法,可以讓 MCP 服務(wù)器獲取 MCP 客戶(hù)端傳遞的鑒權(quán)參數(shù)。
const auth: any = request.params?._meta?.auth;
以這個(gè)Perplexity Ask Server為例,我們把讀取啟動(dòng)時(shí)參數(shù),改成讀取請(qǐng)求時(shí)參數(shù):
// before: get params from env and set as global params // after: get params from env or command line, set as global params import{ getParamValue, getAuthValue }from"@chatmcp/sdk/utils/index.js"; constperplexityApiKey = getParamValue("perplexity_api_key") ||""; constmode = getParamValue("mode") ||"stdio"; constport = getParamValue("port") ||9593; constendpoint = getParamValue("endpoint") ||"/rest"; server.setRequestHandler(CallToolRequestSchema,async(request) => { try{ // before: use global params // after: get auth params from request, use global params if request params not set constapiKey = getAuthValue(request,"PERPLEXITY_API_KEY") || perplexityApiKey; if(!apiKey) { thrownewError("PERPLEXITY_API_KEY not set"); } const{ name,arguments: args } = request.params; if(!args) { thrownewError("No arguments provided"); } switch(name) { case"perplexity_ask": { if(!Array.isArray(args.messages)) { thrownewError( "Invalid arguments for perplexity_ask: 'messages' must be an array" ); } constmessages = args.messages; // before: use global params in every function // const result = await performChatCompletion( // messages, // "sonar-pro" // ); // after: pass params to every function constresult =awaitperformChatCompletion( apiKey, messages, "sonar-pro" ); return{ content: [{type:"text", text: result }], isError:false, }; } // ... } }catch(error) { // Return error details in the response return{ content: [ { type:"text", text:`Error:${ errorinstanceofError? error.message :String(error) }`, }, ], isError:true, }; } });
改造完之后,云端的 MCP 代理,就可以把用戶(hù)設(shè)置的鑒權(quán)參數(shù):perplexity_api_key,通過(guò)這種方式傳給 MCP 服務(wù)器了:
request.params._meta.auth = { perplexity_api_key:"xxx", };
新增 Rest 傳輸
修改這個(gè) MCP 服務(wù)器,在原來(lái)僅支持 stdio 傳輸?shù)幕A(chǔ)上,新增一個(gè) rest http 傳輸:
import{ RestServerTransport }from"@chatmcp/sdk/server/rest.js"; import{ getParamValue }from"@chatmcp/sdk/utils/index.js"; constmode = getParamValue("mode") ||"stdio"; constport = getParamValue("port") ||9593; constendpoint = getParamValue("endpoint") ||"/rest"; asyncfunctionrunServer(){ try{ // after: MCP Server run with rest transport and stdio transport if(mode ==="rest") { consttransport =newRestServerTransport({ port, endpoint, }); awaitserver.connect(transport); awaittransport.startServer(); return; } // before: MCP Server only run with stdio transport consttransport =newStdioServerTransport(); awaitserver.connect(transport); console.error( "Perplexity MCP Server running on stdio with Ask, Research, and Reason tools" ); }catch(error) { console.error("Fatal error running server:", error); process.exit(1); } }
改造之后,這個(gè) MCP 服務(wù)器就支持兩種傳輸機(jī)制了。在不同的場(chǎng)景下,使用不同的傳輸機(jī)制運(yùn)行:
使用 MCP 服務(wù)器
本地使用
通過(guò)源代碼運(yùn)行:
export PERPLEXITY_API_KEY=xxx && node build/index.js
或者使用二進(jìn)制運(yùn)行:
PERPLEXITY_API_KEY=xxx npx -y server-perplexity-ask
或者使用 docker 運(yùn)行:
docker run -i --rm -e PERPLEXITY_API_KEY=xxx mcp/perplexity-ask
這三種方式,都會(huì)在本地啟動(dòng) stdio 通信,在服務(wù)器啟動(dòng)的時(shí)候傳遞 PERPLEXITY_API_KEY 參數(shù)。
云端調(diào)用
改造完的 MCP 服務(wù)器在云端部署的啟動(dòng)命令:
node build/index.js --mode=rest --port=8081 --endpoint=/rest
使用了自定義的 Rest 傳輸,啟動(dòng) HTTP 服務(wù)器,監(jiān)聽(tīng)在本地的 8081 端口。通過(guò)K8S Service創(chuàng)建一個(gè)內(nèi)網(wǎng)訪問(wèn)域名:perplexity-svc
MCP 代理把客戶(hù)端的請(qǐng)求轉(zhuǎn)發(fā)到http://perplexity-svc:8081/rest,就可以處理多個(gè)用戶(hù)的并發(fā)請(qǐng)求了。
MCP 客戶(hù)端僅需要配置一個(gè) MCP 代理的 URL,就可以使用 MCP 服務(wù)器了:
總結(jié)
在本節(jié)中,我們?cè)敿?xì)講解了 MCP 協(xié)議支持的三種標(biāo)準(zhǔn)傳輸機(jī)制以及自定義傳輸?shù)膶?shí)現(xiàn)方式:
stdio 傳輸:
通過(guò)標(biāo)準(zhǔn)輸入/輸出進(jìn)行本地進(jìn)程間通信
優(yōu)勢(shì)在于實(shí)現(xiàn)簡(jiǎn)單、通信速度快、安全性高
主要適用于本地?cái)?shù)據(jù)訪問(wèn)場(chǎng)景
局限于單進(jìn)程通信,資源開(kāi)銷(xiāo)較大
SSE 傳輸(即將廢棄):
基于 HTTP 協(xié)議,支持遠(yuǎn)程資源訪問(wèn)
使用雙通道響應(yīng)機(jī)制(SSE 連接 + POST 端點(diǎn))
存在連接不穩(wěn)定、擴(kuò)展性差、瀏覽器限制等問(wèn)題
不適合無(wú)服務(wù)器(serverless)環(huán)境和云原生架構(gòu)
Streamable HTTP 傳輸:
替代 SSE 的新傳輸機(jī)制,兼容現(xiàn)代云架構(gòu)
更靈活的連接模式,支持簡(jiǎn)單請(qǐng)求-響應(yīng)和流式傳輸
標(biāo)準(zhǔn)化的會(huì)話管理和斷點(diǎn)恢復(fù)功能
適合遠(yuǎn)程訪問(wèn)、無(wú)服務(wù)器環(huán)境和大規(guī)模部署
自定義傳輸機(jī)制:
MCP 協(xié)議支持實(shí)現(xiàn)自定義傳輸以滿(mǎn)足特定需求
可以針對(duì)特定部署環(huán)境和使用場(chǎng)景進(jìn)行優(yōu)化
示例中實(shí)現(xiàn)了一個(gè) Rest Transport,適合無(wú)狀態(tài)、短連接場(chǎng)景
傳輸機(jī)制的選擇應(yīng)基于具體應(yīng)用場(chǎng)景:
本地?cái)?shù)據(jù)訪問(wèn)優(yōu)先選擇 stdio 傳輸
遠(yuǎn)程資源訪問(wèn)優(yōu)先選擇 Streamable HTTP 傳輸
特殊需求場(chǎng)景可考慮實(shí)現(xiàn)自定義傳輸
MCP 協(xié)議的可插拔傳輸架構(gòu)使其能夠靈活適應(yīng)不同的部署環(huán)境和通信需求,從簡(jiǎn)單的本地工具到復(fù)雜的云服務(wù)均可支持。隨著技術(shù)發(fā)展,傳輸機(jī)制也在不斷優(yōu)化,以提供更好的性能、可靠性和可擴(kuò)展性。
-
通信
+關(guān)注
關(guān)注
18文章
6139瀏覽量
137124 -
服務(wù)器
+關(guān)注
關(guān)注
12文章
9578瀏覽量
86922 -
MCP
+關(guān)注
關(guān)注
0文章
262瀏覽量
14165 -
傳輸機(jī)制
+關(guān)注
關(guān)注
0文章
3瀏覽量
1203
原文標(biāo)題:詳解 MCP 傳輸機(jī)制
文章出處:【微信號(hào):OSC開(kāi)源社區(qū),微信公眾號(hào):OSC開(kāi)源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
一文詳解MCP存儲(chǔ)器的結(jié)構(gòu)原理
一文詳解藍(lán)牙模塊原理與結(jié)構(gòu)
一文詳解差分傳輸的噪聲抑制資料下載

評(píng)論