不管是服務(wù)導(dǎo)出還是服務(wù)引入,都發(fā)生在應(yīng)用啟動過程中,比如:在啟動類上加上 @EnableDubbo 時,該注解上有一個 @DubboComponentScan 注解,@DubboComponentScan 注解 Import 了一個 DubboComponentScanRegistrar,DubboComponentScanRegistrar 中會調(diào)用 DubboSpringInitializer.initialize(),該方法中會注冊一個 DubboDeployApplicationListener,而 DubboDeployApplicationListener 會監(jiān)聽 Spring 容器啟動完成事件 ContextRefreshedEvent,一旦接收到這個事件后,就會開始 Dubbo 的啟動流程,就會執(zhí)行 DefaultModuleDeployer 的 start() 進(jìn)行服務(wù)導(dǎo)出與服務(wù)引入。
在啟動過程中,在做完服務(wù)導(dǎo)出與服務(wù)引入后,還會做幾件非常重要的事情:
1.導(dǎo)出一個應(yīng)用元數(shù)據(jù)服務(wù)(就是一個 MetadataService 服務(wù),這個服務(wù)也會注冊到注冊中心),或者將應(yīng)用元數(shù)據(jù)注冊到元數(shù)據(jù)中心;
2.生成當(dāng)前應(yīng)用的實(shí)例信息對象 ServiceInstance,比如:應(yīng)用名、實(shí)例 ip、實(shí)例 port,并將實(shí)例信息注冊到注冊中心,也就是應(yīng)用級注冊;
服務(wù)導(dǎo)出
當(dāng)在某個接口的實(shí)現(xiàn)類上加上 @DubboService 后,就表示定義了一個 Dubbo 服務(wù),應(yīng)用啟動時 Dubbo 只要掃描到了 @DubboService,就會解析對應(yīng)的類,得到服務(wù)相關(guān)的配置信息,比如:
1.服務(wù)的類型,也就是接口,接口名就是服務(wù)名;
2.服務(wù)的具體實(shí)現(xiàn)類,也就是當(dāng)前類;
3.服務(wù)的 version、timeout 等信息,就是 @DubboService 中所定義的各種配置;
解析完服務(wù)的配置信息后,就會把這些配置信息封裝成為一個 ServiceConfig 對象,并調(diào)用其 export() 進(jìn)行服務(wù)導(dǎo)出,此時一個 ServiceConfig 對象就表示一個Dubbo 服務(wù)。
而所謂的服務(wù)導(dǎo)出,主要就是完成三件事情:
1.確定服務(wù)的最終參數(shù)配置;
2.按不同協(xié)議啟動對應(yīng)的 Server(服務(wù)暴露);
3.將服務(wù)注冊到注冊中心(服務(wù)注冊);
確定服務(wù)參數(shù)
一個 Dubbo 服務(wù),除開服務(wù)的名字,也就是接口名,還會有很多其他的屬性,比如:超時時間、版本號、服務(wù)所屬應(yīng)用名、所支持的協(xié)議及綁定的端口等眾多信息。
但是,通常這些信息并不會全部在 @DubboService 中進(jìn)行定義,比如:一個 Dubbo 服務(wù)肯定是屬于某個應(yīng)用的,而一個應(yīng)用下可以有多個 Dubbo 服務(wù),所以可以在應(yīng)用級別定義一些通用的配置,比如協(xié)議。
在 application.yml 中定義:
dubbo:
application:
name: dubbo-springboot-demo-provider
protocol:
name: tri
port: 20880
表示當(dāng)前應(yīng)用下所有的 Dubbo 服務(wù)都支持通過 tri 協(xié)議進(jìn)行訪問,并且訪問端口為 20880,所以在進(jìn)行某個服務(wù)的服務(wù)導(dǎo)出時,就需要將應(yīng)用中的這些配置信息合并到當(dāng)前服務(wù)的配置信息中。
另外,除開可以通過 @DubboService 來配置服務(wù),也可以在配置中心對服務(wù)進(jìn)行配置,比如:在配置中心中配置:
dubbo.service.org.apache.dubbo.samples.api.DemoService.timeout=5000
表示當(dāng)前服務(wù)的超時時間為 5s。
所以,在服務(wù)導(dǎo)出時,也需要從配置中心獲取當(dāng)前服務(wù)的配置,如果在 @DubboService 中也定義了 timeout,那么就用配置中心的覆蓋掉,配置中心的配置優(yōu)先級更高。
最終確定出服務(wù)的各種參數(shù),這塊內(nèi)容和 Dubbo2.7 一致。
服務(wù)注冊
當(dāng)確定好了最終的服務(wù)配置后,Dubbo 就會根據(jù)這些配置信息生成對應(yīng)的服務(wù) URL,比如:
tri://192.168.65.221:20880/org.apache.dubbo.springboot.demo.DemoService?application=dubbo-springboot-demo-provider&timeout=3000
這個 URL 就表示了一個 Dubbo 服務(wù),服務(wù)消費(fèi)者只要能獲得到這個服務(wù) URL,就知道了關(guān)于這個 Dubbo 服務(wù)的全部信息,包括服務(wù)名、支持的協(xié)議、ip、port、各種配置。
確定了服務(wù) URL 之后,服務(wù)注冊要做的事情就是把這個服務(wù) URL 存到注冊中心(比如:Zookeeper)中去,說的再簡單一點(diǎn),就是把這個字符串存到 Zookeeper中去,這個步驟其實(shí)是非常簡單的,實(shí)現(xiàn)這個功能的源碼在 RegistryProtocol 中的 export() 方法中,最終服務(wù) URL 存在了 Zookeeper 的 /dubbo/ 接口名 /providers 目錄下。
但是服務(wù)注冊并不僅僅就這么簡單,既然上面的這個 URL 表示一個服務(wù),并且還包括了服務(wù)的一些配置信息,那這些配置信息如果改變了呢?比如:利用 Dubbo管理臺中的動態(tài)配置功能(注意,并不是配置中心)來修改服務(wù)配置,動態(tài)配置可以應(yīng)用運(yùn)行過程中動態(tài)的修改服務(wù)的配置,并實(shí)時生效。
如果利用動態(tài)配置功能修改了服務(wù)的參數(shù),那此時就要重新生成服務(wù) URL 并重新注冊到注冊中心,這樣服務(wù)消費(fèi)者就能及時的獲取到服務(wù)配置信息。
而對于服務(wù)提供者而言,在服務(wù)注冊過程中,還需要能監(jiān)聽到動態(tài)配置的變化,一旦發(fā)生了變化,就根據(jù)最新的配置重新生成服務(wù) URL,并重新注冊到中心。
應(yīng)用級注冊
在 Dubbo3.0 之前,Dubbo 是接口級注冊,服務(wù)注冊就是把接口名以及服務(wù)配置信息注冊到注冊中心中,注冊中心存儲的數(shù)據(jù)格式大概為:
接口名1:tri://192.168.1.221:20880/接口名1?application=應(yīng)用名
接口名2:tri://192.168.1.221:20880/接口名2?application=應(yīng)用名
接口名3:tri://192.168.1.221:20880/接口名3?application=應(yīng)用名
key 是接口名,value 就是服務(wù) URL,上面的內(nèi)容就表示現(xiàn)在有一個應(yīng)用,該應(yīng)用下有 3 個接口,應(yīng)用實(shí)例部署在 192.168.1.221,此時,如果給該應(yīng)用增加一個實(shí)例,實(shí)例 ip 為192.168.1.222,那么新的實(shí)例也需要進(jìn)行服務(wù)注冊,會向注冊中心新增 3 條數(shù)據(jù):
接口名1:tri://192.168.1.221:20880/接口名1?application=應(yīng)用名
接口名2:tri://192.168.1.221:20880/接口名2?application=應(yīng)用名
接口名3:tri://192.168.1.221:20880/接口名3?application=應(yīng)用名
接口名1:tri://192.168.1.222:20880/接口名1?application=應(yīng)用名
接口名2:tri://192.168.1.222:20880/接口名2?application=應(yīng)用名
接口名3:tri://192.168.1.222:20880/接口名3?application=應(yīng)用名
可以發(fā)現(xiàn),如果一個應(yīng)用中有 3 個 Dubbo 服務(wù),那么每增加一個實(shí)例,就會向注冊中心增加 3 條記錄,那如果一個應(yīng)用中有 10 個 Dubbo 服務(wù),那么每增加一個實(shí)例,就會向注冊中心增加 10 條記錄,注冊中心的壓力會隨著應(yīng)用實(shí)例的增加而劇烈增加。
反過來,如果一個應(yīng)用有 3 個 Dubbo 服務(wù),5 個實(shí)例,那么注冊中心就有 15 條記錄,此時增加一個 Dubbo 服務(wù),那么注冊中心就會新增 5 條記錄,注冊中心的壓力也會劇烈增加。
注冊中心的數(shù)據(jù)越多,數(shù)據(jù)就變化的越頻繁,比如:修改服務(wù)的 timeout,那么對于注冊中心和應(yīng)用都需要消耗資源用來處理數(shù)據(jù)變化。
所以為了降低注冊中心的壓力,Dubbo3.0 支持了應(yīng)用級注冊,同時也兼容接口級注冊,用戶可以逐步遷移成應(yīng)用級注冊,而一旦采用應(yīng)用級注冊,最終注冊中心的數(shù)據(jù)存儲就變成為:
應(yīng)用名:192.168.1.221:20880
應(yīng)用名:192.168.1.222:20880
表示在注冊中心中,只記錄應(yīng)用所對應(yīng)的實(shí)例信息(IP + 綁定的端口),這樣只有一個應(yīng)用的實(shí)例增加了,那么注冊中心的數(shù)據(jù)才會增加,而不關(guān)心一個應(yīng)用中到底有多少個 Dubbo 服務(wù)。
這樣帶來的好處就是,注冊中心存儲的數(shù)據(jù)變少了,注冊中心中數(shù)據(jù)的變化頻率變小了,并且使用應(yīng)用級注冊,使得 Dubbo3 能實(shí)現(xiàn)與異構(gòu)微服務(wù)體系如:Spring Cloud、Kubernetes Service 等在地址發(fā)現(xiàn)層面更容易互通, 為連通 Dubbo 與其他微服務(wù)體系提供可行方案。
應(yīng)用級注冊帶來了好處,但是對于 Dubbo 來說又出現(xiàn)了一些新的問題,比如:原本,服務(wù)消費(fèi)者可以直接從注冊中心就知道某個 Dubbo 服務(wù)的所有服務(wù)提供者以及相關(guān)的協(xié)議、ip、port、配置等信息,那現(xiàn)在注冊中心上只有 ip、port,那對于服務(wù)消費(fèi)者而言:服務(wù)消費(fèi)者怎么知道現(xiàn)在它要用的某個 Dubbo 服務(wù),也就是某個接口對應(yīng)的應(yīng)用是哪個呢?
對于這個問題,在進(jìn)行服務(wù)導(dǎo)出的過程中,會在 Zookeeper 中存一個映射關(guān)系,在服務(wù)導(dǎo)出的最后一步,在 ServiceConfig 的 exported() 方法中,會保存這個映射關(guān)系:接口名:應(yīng)用名
。這個映射關(guān)系存在 Zookeeper 的 /dubbo/mapping 目錄下,存了這個信息后,消費(fèi)者就能根據(jù)接口名找到所對應(yīng)的應(yīng)用名了。
消費(fèi)者知道了要使用的 Dubbo 服務(wù)在哪個應(yīng)用,那也就能從注冊中心中根據(jù)應(yīng)用名查到應(yīng)用的所有實(shí)例信息( ip + port ),也就是可以發(fā)送方法調(diào)用請求了,但是在真正發(fā)送請求之前,還得知道服務(wù)的配置信息,對于消費(fèi)者而言,它得知道當(dāng)前要調(diào)用的這個 Dubbo 服務(wù)支持什么協(xié)議、timeout 是多少,那服務(wù)的配置信息從哪里獲取呢?
之前的服務(wù)配置信息是直接從注冊中心就可以獲取到的,就是服務(wù) URL 后面,但是現(xiàn)在不行了,現(xiàn)在需要從服務(wù)提供者的元數(shù)據(jù)服務(wù)獲取,前面提到過,在應(yīng)用啟動過程中會進(jìn)行服務(wù)導(dǎo)出和服務(wù)引入,然后就會暴露一個應(yīng)用元數(shù)據(jù)服務(wù),其實(shí)這個應(yīng)用元數(shù)據(jù)服務(wù)就是一個 Dubbo 服務(wù)(Dubbo 框架內(nèi)置的,自己實(shí)現(xiàn)的 ),消費(fèi)者可以調(diào)用這個服務(wù)來獲取某個應(yīng)用中所提供的所有 Dubbo 服務(wù)以及服務(wù)配置信息,這樣也就能知道服務(wù)的配置信息了。
知道了應(yīng)用注冊的好處,以及相關(guān)問題的解決方式,那么來看它到底是如何實(shí)現(xiàn)的。
首先,我們可以通過配置 dubbo.application.register-mode 來控制:
1.instance:表示只進(jìn)行應(yīng)用級注冊;
2.interface:表示只進(jìn)行接口級注冊;
3.all:表示應(yīng)用級注冊和接口級注冊都進(jìn)行,默認(rèn);
不管是什么注冊,都需要存數(shù)據(jù)到注冊中心,而 Dubbo3 的源碼實(shí)現(xiàn)中會根據(jù)所配置的注冊中心生成兩個 URL(不是服務(wù) URL,可以理解為注冊中心 URL,用來訪問注冊中心的):
1.service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbospringboot-demoprovider& dubbo=2.0.2&pid=13072&qos.enable=false?istry=zookeeper×tamp=1651755501660
2.registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-demoprovider& dubbo=2.0.2&pid=13072&qos.enable=false?istry=zookeeper×tamp=1651755501660
這兩個 URL 只有 schema 不一樣,一個是 service-discovery-registry,一個是 registry,而 registry 是 Dubbo3 之前就存在的,也就代表接口級服務(wù)注冊,而service-discovery-registry 就表示應(yīng)用級服務(wù)注冊。
在服務(wù)注冊相關(guān)的源碼中,當(dāng)調(diào)用 RegistryProtocol 的 export() 方法處理 registry:// 時,會利用 ZookeeperRegistry 把服務(wù) URL 注冊到 Zookeeper 中去,這就是接口級注冊。
而類似,當(dāng)調(diào)用 RegistryProtocol 的 export() 方法處理 service-discovery-registry:// 時,會利用 ServiceDiscoveryRegistry 來進(jìn)行相關(guān)邏輯的處理,那是不是就是在這里把應(yīng)用信息注冊到注冊中心去呢?并沒有這么簡單。
1.首先,不可能每導(dǎo)出一個服務(wù)就進(jìn)行一次應(yīng)用注冊,太浪費(fèi)了,應(yīng)用注冊只要做一次就行了
2.其次,如果一個應(yīng)用支持了多個端口,那么應(yīng)用注冊時只要挑選其中一個端口作為實(shí)例端口就可以了(該端口只要能接收到數(shù)據(jù)就行)
3.前面提到,應(yīng)用啟動過程中要暴露應(yīng)用元數(shù)據(jù)服務(wù),所以在此處也還是要收集當(dāng)前所暴露的服務(wù)配置信息,以提供給應(yīng)用元數(shù)據(jù)服務(wù)
所以 ServiceDiscoveryRegistry 在注冊一個服務(wù) URL 時,并不會往注冊中心存數(shù)據(jù),而只是把服務(wù) URL 存到到一個 MetadataInfo 對象中,MetadataInfo 對象中就保存了當(dāng)前應(yīng)用中所有的 Dubbo 服務(wù)信息:服務(wù)名、支持的協(xié)議、綁定的端口、timeout 等。
前面提到過,在應(yīng)用啟動的最后,才會進(jìn)行應(yīng)用級注冊,而應(yīng)用級注冊就是當(dāng)前的應(yīng)用實(shí)例上相關(guān)的信息存入注冊中心,包括:
1.應(yīng)用的名字;
2.獲取應(yīng)用元數(shù)據(jù)的方式;
3.當(dāng)前實(shí)例的 ip 和 port;
4.當(dāng)前實(shí)例支持哪些協(xié)議以及對應(yīng)的 port;
比如:
{
"name":"dubbo-springboot-demo-provider",
"id":"192.168.65.221:20882",
"address":"192.168.65.221",
"port":20882,
"sslPort":null,
"payload":{
"@class":"org.apache.dubbo.registry.zookeeper.ZookeeperInstance",
"id":"192.168.65.221:20882",
"name":"dubbo-springboot-demo-provider",
"metadata":{
"dubbo.endpoints":"[{"port":20882,"protocol":"dubbo"},{"port":50051,"protocol":"tri"}]",
"dubbo.metadata-service.url-params":"{"connections":"1","version":"1.0.0","dubbo":"2.0.2","side":"provider","port":"20882","protocol":"dubbo"}",
"dubbo.metadata.revision":"65d5c7b814616ab10d32860b54781686",
"dubbo.metadata.storage-type":"local"
}
},
"registrationTimeUTC":1654585977352,
"serviceType":"DYNAMIC",
"uriSpec":null
}
一個實(shí)例上可能支持多個協(xié)議以及多個端口,那如何確定實(shí)例的 ip 和端口呢?
答案是:獲取 MetadataInfo 對象中保存的所有服務(wù) URL,優(yōu)先取 dubbo 協(xié)議對應(yīng) ip 和 port,沒有 dubbo 協(xié)議則所有服務(wù) URL 中的第一個 URL 的 ip 和 port。
另外一個協(xié)議一般只會對應(yīng)一個端口,但是如何就是對應(yīng)了多個,比如:
dubbo:
application:
name: dubbo-springboot-demo-provider
protocols:
p1:
name: dubbo
port: 20881
p2:
name: dubbo
port: 20882
p3:
name: tri
port: 50051
如果是這樣,最終存入 endpoint 中的會保證一個協(xié)議只對應(yīng)一個端口,另外那個將被忽略,最終服務(wù)消費(fèi)者在進(jìn)行服務(wù)引入時將會用到這個 endpoint 信息。
確定好實(shí)例信息后之后,就進(jìn)行最終的應(yīng)用注冊了,就把實(shí)例信息存入注冊中心的 /services/應(yīng)用名,目錄下:
?
??
可以看出 services 節(jié)點(diǎn)下存的是應(yīng)用名,應(yīng)用名的節(jié)點(diǎn)下存的是實(shí)例 ip 和實(shí)例 port,而 ip 和 port 這個節(jié)點(diǎn)中的內(nèi)容就是實(shí)例的一些基本信息。
額外,我們可以配置 dubbo.metadata.storage-type,默認(rèn)時 local,可以通過配置改為 remote:
dubbo:
application:
name: dubbo-springboot-demo-provider
metadata-type: remote
這個配置其實(shí)跟應(yīng)用元數(shù)據(jù)服務(wù)有關(guān)系:
1.如果為 local,那就會啟用應(yīng)用元數(shù)據(jù)服務(wù),最終服務(wù)消費(fèi)者就會調(diào)用元數(shù)據(jù)服務(wù)獲取到應(yīng)用元數(shù)據(jù)信息;
2.如果為 remote,那就不會暴露應(yīng)用元數(shù)據(jù)服務(wù),那么服務(wù)消費(fèi)者從元數(shù)據(jù)中心獲取應(yīng)用元數(shù)據(jù);
在 Dubbo2.7 中就有了元數(shù)據(jù)中心,它其實(shí)就是用來減輕注冊中心的壓力的,Dubbo 會把服務(wù)信息完整的存一份到元數(shù)據(jù)中心,元數(shù)據(jù)中心也可以用 Zookeeper來實(shí)現(xiàn),在暴露完元數(shù)據(jù)服務(wù)之后,在注冊實(shí)例信息到注冊中心之前,就會把 MetadataInfo 存入元數(shù)據(jù)中心,比如:
?
??
節(jié)點(diǎn)內(nèi)容為:
{
"app":"dubbo-springboot-demo-provider",
"revision":"64e68950e300068e6b5f8632d9fd141d",
"services":{
"org.apache.dubbo.springboot.demo.HelloService:tri":{
"name":"org.apache.dubbo.springboot.demo.HelloService",
"protocol":"tri",
"path":"org.apache.dubbo.springboot.demo.HelloService",
"params":{
"side":"provider",
"release":"",
"methods":"sayHello",
"deprecated":"false",
"dubbo":"2.0.2",
"interface":"org.apache.dubbo.springboot.demo.HelloService",
"service-name-mapping":"true",
"generic":"false",
"metadata-type":"remote",
"application":"dubbo-springboot-demo-provider",
"background":"false",
"dynamic":"true",
"anyhost":"true"
}
},
"org.apache.dubbo.springboot.demo.DemoService:tri":{
"name":"org.apache.dubbo.springboot.demo.DemoService",
"protocol":"tri",
"path":"org.apache.dubbo.springboot.demo.DemoService",
"params":{
"side":"provider",
"release":"",
"methods":"sayHelloStream,sayHello,sayHelloServerStream",
"deprecated":"false",
"dubbo":"2.0.2",
"interface":"org.apache.dubbo.springboot.demo.DemoService",
"service-name-mapping":"true",
"generic":"false",
"metadata-type":"remote",
"application":"dubbo-springboot-demo-provider",
"background":"false",
"dynamic":"true",
"anyhost":"true"
}
}
}
}
這里面就記錄了當(dāng)前實(shí)例上提供了哪些服務(wù)以及對應(yīng)的協(xié)議,注意并沒有保存對應(yīng)的端口......,所以后面服務(wù)消費(fèi)者得利用實(shí)例信息中的 endpoint,因?yàn)閑ndpoint 中記錄了協(xié)議對應(yīng)的端口....
其實(shí)元數(shù)據(jù)中心和元數(shù)據(jù)服務(wù)提供的功能是一樣的,都可以用來獲取某個實(shí)例的 MetadataInfo,上面中的 UUID 表示實(shí)例編號,只不過元數(shù)據(jù)中心是集中式的,元數(shù)據(jù)服務(wù)式分散在各個提供者實(shí)例中的,如果整個微服務(wù)集群壓力不大,那么效果差不多,如果微服務(wù)集群壓力大,那么元數(shù)據(jù)中心的壓力就大,此時單個元數(shù)據(jù)服務(wù)就更適合,所以默認(rèn)也是采用的元數(shù)據(jù)服務(wù)。
至此,應(yīng)用級服務(wù)注冊的原理就分析完了,總結(jié)一下:
1.在導(dǎo)出某個 Dubbo 服務(wù) URL 時,會把服務(wù) URL 存入 MetadataInfo 中;
2.導(dǎo)出完某個 Dubbo 服務(wù)后,就會把服務(wù)接口名:應(yīng)用名存入元數(shù)據(jù)中心(可以用 Zookeeper 實(shí)現(xiàn));
3.導(dǎo)出所有服務(wù)后,完成服務(wù)引入后;
4.判斷要不要啟動元數(shù)據(jù)服務(wù),如果要就進(jìn)行導(dǎo)出,固定使用 Dubbo 協(xié)議;
5.將 MetadataInfo 存入元數(shù)據(jù)中心;
6.確定當(dāng)前實(shí)例信息(應(yīng)用名、ip、port、endpoint);
7.將實(shí)例信息存入注冊中心,完成應(yīng)用注冊;
服務(wù)暴露
服務(wù)暴露就是根據(jù)不同的協(xié)議啟動不同的 Server,比如:dubbo 和 tri 協(xié)議啟動的都是 Netty,像 Dubbo2.7 中的 http 協(xié)議啟動的就是 Tomcat,這塊在服務(wù)調(diào)用的時候再來分析。
服務(wù)引入
@DubboReference
private DemoService demoService;
需要利用 @DubboReference 注解來引入某一個 Dubbo 服務(wù),應(yīng)用在啟動過程中,進(jìn)行完服務(wù)導(dǎo)出之后,就會進(jìn)行服務(wù)引入,屬性的類型就是一個 Dubbo 服務(wù)接口,而服務(wù)引入最終要做到的就是給這個屬性賦值一個接口代理對象。
在 Dubbo2.7 中,只有接口級服務(wù)注冊,服務(wù)消費(fèi)者會利用接口名從注冊中心找到該服務(wù)接口所有的服務(wù) URL,服務(wù)消費(fèi)者會根據(jù)每個服務(wù) URL 的 protocol、ip、port 生成對應(yīng)的 Invoker 對象,比如生成 TripleInvoker、DubboInvoker 等,調(diào)用這些 Invoker 的 invoke() 方法就會發(fā)送數(shù)據(jù)到對應(yīng)的 ip、port,生成好所有的 Invoker 對象之后,就會把這些 Invoker 對象進(jìn)行封裝并生成一個服務(wù)接口的代理對象,代理對象調(diào)用某個方法時,會把所調(diào)用的方法信息生成一個 Invocation 對象,并最終通過某一個 Invoker 的 invoke() 方法把 Invocation 對象發(fā)送出去,所以代理對象中的 Invoker 對象是關(guān)鍵,服務(wù)引入最核心的就是要生成這些 Invoker 對象。
Invoker 是非常核心的一個概念,也有非常多種類,比如:
1.TripleInvoker:表示利用 tri 協(xié)議把 Invocation 對象發(fā)送出去;
2.DubboInvoker:表示利用 dubbo 協(xié)議把 Invocation 對象發(fā)送出去;
3.ClusterInvoker:有負(fù)載均衡功能;
4.MigrationInvoker:遷移功能,Dubbo3.0 新增的
像 TripleInvoker 和 DubboInvoker 對應(yīng)的就是具體服務(wù)提供者,包含了服務(wù)提供者的 ip 地址和端口,并且會負(fù)責(zé)跟對應(yīng)的 ip 和 port 建立 Socket 連接,后續(xù)就可以基于這個 Socket 連接并按協(xié)議格式發(fā)送 Invocation 對象。
比如現(xiàn)在引入了 DemoService 這個服務(wù),那如果該服務(wù)支持:
1.一個 tri 協(xié)議,綁定的端口為 20881;
2.一個 tri 協(xié)議,綁定的端口為 20882;
3.一個 dubbo 協(xié)議,綁定的端口為 20883;
那么在服務(wù)消費(fèi)端這邊,就會生成兩個 TripleInvoker 和一個 DubboInvoker,代理對象執(zhí)行方法時就會進(jìn)行負(fù)載均衡選擇其中一個 Invoker 進(jìn)行調(diào)用。
接口級服務(wù)引入
在服務(wù)導(dǎo)出時,Dubbo3.0 默認(rèn)情況下即會進(jìn)行接口級注冊,也會進(jìn)行應(yīng)用級注冊,目的就是為了兼容服務(wù)消費(fèi)者應(yīng)用,用的還是 Dubbo2.7,用 Dubbo2.7 就只能老老實(shí)實(shí)的進(jìn)行接口級服務(wù)引入。
接口級服務(wù)引入核心就是要找到當(dāng)前所引入的服務(wù)有哪些服務(wù) URL,然后根據(jù)每個服務(wù) URL 生成對應(yīng)的 Invoker,流程為:
1.首先,根據(jù)當(dāng)前引入的服務(wù)接口生成一個 RegistryDirectory 對象,表示動態(tài)服務(wù)目錄,用來查詢并緩存服務(wù)提供者信息;
2.RegistryDirectory 對象會根據(jù)服務(wù)接口名去注冊中心,比如:Zookeeper 中的 /dubbo/服務(wù)接口名/providers/ 節(jié)點(diǎn)下查找所有的服務(wù) URL;
3.根據(jù)每個服務(wù) URL 生成對應(yīng)的 Invoker 對象,并把 Invoker 對象存在 RegistryDirectory 對象的 invokers 屬性中;
4.RegistryDirectory 對象也會監(jiān)聽 /dubbo/服務(wù)接口名/providers/ 節(jié)點(diǎn)的數(shù)據(jù)變化,一旦發(fā)生了變化就要進(jìn)行相應(yīng)的改變;
5.最后將 RegistryDirectory 對象生成一個 ClusterInvoker 對象,到時候調(diào)用 ClusterInvoker 對象的 invoke() 方法就會進(jìn)行負(fù)載均衡選出某一個 Invoker 進(jìn)行調(diào)用;
?
??
應(yīng)用級服務(wù)引入
在 Dubbo 中,應(yīng)用級服務(wù)引入,并不是指引入某個應(yīng)用,這里和 SpringCloud 是有區(qū)別的,在 SpringCloud 中,服務(wù)消費(fèi)者只要從注冊中心找到要調(diào)用的應(yīng)用的所有實(shí)例地址就可以了,但是在 Dubbo 中找到應(yīng)用的實(shí)例地址還遠(yuǎn)遠(yuǎn)不夠,因?yàn)樵?Dubbo 中是直接使用的接口,所以在 Dubbo 中就算是應(yīng)用級服務(wù)引入,最終還是得找到服務(wù)接口有哪些服務(wù)提供者。
所以,對于服務(wù)消費(fèi)者而言,不管是使用接口級服務(wù)引入,還是應(yīng)用級服務(wù)引入,最終的結(jié)果應(yīng)該得是一樣的,也就是某個服務(wù)接口的提供者 Invoker 是一樣的,不可能使用應(yīng)用級服務(wù)引入得到的 Invoker 多一個或少一個,但是!?。。壳皶星闆r不一致,就是一個協(xié)議有多個端口時,比如在服務(wù)提供者應(yīng)用這邊支持:
dubbo:
application:
name: dubbo-springboot-demo-provider
protocols:
p1:
name: dubbo
port: 20881
p2:
name: tri
port: 20882
p3:
name: tri
port: 50051
那么在消費(fèi)端進(jìn)行服務(wù)引入時,比如:引入 DemoService 時,接口級服務(wù)引入會生成 3 個 Invoker(2個 TripleInvoker,1個DubboInvoker),而應(yīng)用級服務(wù)引入只會生成 2 個 Invoker(1個TripleInvoker,1個DubboInvoker),原因就是在進(jìn)行應(yīng)用級注冊時是按照一個協(xié)議對應(yīng)一個port存的。
那既然接口級服務(wù)引入和應(yīng)用級服務(wù)引入最終的結(jié)果差不多,可能就不理解了,那應(yīng)用級服務(wù)引入有什么好處呢?要知道應(yīng)用級服務(wù)引入和應(yīng)用級服務(wù)注冊是對應(yīng),服務(wù)提供者應(yīng)用如果只做應(yīng)用級注冊,那么對應(yīng)的服務(wù)消費(fèi)者就只能進(jìn)行應(yīng)用級服務(wù)引入,好處就是前面所說的,減輕了注冊中心的壓力等,那么帶來的影響就是服務(wù)消費(fèi)者端尋找服務(wù) URL 的邏輯更復(fù)雜了。
只要找到了當(dāng)前引入服務(wù)對應(yīng)的服務(wù) URL,然后生成對應(yīng)的 Invoker,并最終生成一個 ClusterInvoker。
在進(jìn)行應(yīng)用級服務(wù)引入時:
1.首先,根據(jù)當(dāng)前引入的服務(wù)接口生成一個 ServiceDiscoveryRegistryDirectory 對象,表示動態(tài)服務(wù)目錄,用來查詢并緩存服務(wù)提供者信息;
2.根據(jù)接口名去獲取 /dubbo/mapping/服務(wù)接口名節(jié)點(diǎn)的內(nèi)容,拿到的就是該接口所對應(yīng)的應(yīng)用名;
3.有了應(yīng)用名之后,再去獲取 /services/應(yīng)用名節(jié)點(diǎn)下的實(shí)例信息;
4.依次遍歷每個實(shí)例,每個實(shí)例都有一個編號 revision
5.根據(jù) metadata-type 進(jìn)行判斷如果是 local:則調(diào)用實(shí)例上的元數(shù)據(jù)服務(wù)獲取應(yīng)用元數(shù)據(jù)(MetadataInfo);如果是 remote:則根據(jù)應(yīng)用名從元數(shù)據(jù)中心獲取應(yīng)用元數(shù)據(jù)(MetadataInfo);
6.獲取到應(yīng)用元數(shù)據(jù)之后就進(jìn)行緩存,key 為 revision,MetadataInfo 對象為 value
7.這里為什么要去每個實(shí)例上獲取應(yīng)用的元數(shù)據(jù)信息呢?因?yàn)橛锌赡懿灰粯樱m然是同一個應(yīng)用,但是在運(yùn)行不同的實(shí)例的時候,可以指定不同的參數(shù),比如不同的協(xié)議,不同的端口,雖然在生產(chǎn)上基本不會這么做,但是 Dubbo 還是支持了這種情況
8.根據(jù)從所有實(shí)例上獲取到的 MetadataInfo 以及 endpoint 信息,就能知道所有實(shí)例上所有的服務(wù) URL(注意:一個接口+一個協(xié)議+一個實(shí)例 : 對應(yīng)一個服務(wù)URL)
9.拿到了這些服務(wù) URL 之后,就根據(jù)當(dāng)前引入服務(wù)的信息進(jìn)行過濾,會根據(jù)引入服務(wù)的接口名+協(xié)議名,消費(fèi)者可以在 @DubboReference 中指定協(xié)議,表示只使用這個協(xié)議調(diào)用當(dāng)前服務(wù),如果沒有指定協(xié)議,那么就會去獲取 tri、dubbo、rest 這三個協(xié)議對應(yīng)的服務(wù) URL(Dubbo3.0 默認(rèn)只支持這三個協(xié)議)
10.這樣,經(jīng)過過濾之后,就得到了當(dāng)前所引入的服務(wù)對應(yīng)的服務(wù) URL 了
11.根據(jù)每個服務(wù) URL 生成對應(yīng)的 Invoker 對象,并把 Invoker 對象存在 ServiceDiscoveryRegistryDirectory 對象的 invokers 屬性中
12.最后將 ServiceDiscoveryRegistryDirectory 對象生成一個 ClusterInvoker 對象,到時候調(diào)用 ClusterInvoker 對象的 invoke() 方法就會進(jìn)行負(fù)載均衡選出某一個 Invoker 進(jìn)行調(diào)用
MigrationInvoker 的生成
上面分析了接口級服務(wù)引入和應(yīng)用級服務(wù)引入,最終都是得到某個服務(wù)對應(yīng)的服務(wù)提供者 Invoker,那最終進(jìn)行服務(wù)調(diào)用時,到底該怎么選擇呢?
所以在 Dubbo3.0 中,可以配置:
# dubbo.application.service-discovery.migration 僅支持通過 -D 以及 全局配置中心 兩種方式進(jìn)行配置。
dubbo.application.service-discovery.migration=APPLICATION_FIRST
# 可選值
# FORCE_INTERFACE,強(qiáng)制使用接口級服務(wù)引入
# FORCE_APPLICATION,強(qiáng)制使用應(yīng)用級服務(wù)引入
# APPLICATION_FIRST,智能選擇是接口級還是應(yīng)用級,默認(rèn)就是這個
對于前兩種強(qiáng)制的方式,沒什么特殊,就是上面走上面分析的兩個過程,沒有額外的邏輯,那對于 APPLICATION_FIRST 就需要有額外的邏輯了,也就是 Dubbo 要判斷,當(dāng)前所引入的這個服務(wù),應(yīng)該走接口級還是應(yīng)用級,這該如何判斷呢?
事實(shí)上,在進(jìn)行某個服務(wù)的服務(wù)引入時,會統(tǒng)一利用 InterfaceCompatibleRegistryProtocol 的 refer 來生成一個 MigrationInvoker 對象,在 MigrationInvoker 中有三個屬性:
private volatile ClusterInvoker invoker; // 用來記錄接口級ClusterInvoker
private volatile ClusterInvoker serviceDiscoveryInvoker; // 用來記錄應(yīng)用級的ClusterInvoker
private volatile ClusterInvoker currentAvailableInvoker; // 用來記錄當(dāng)前使用的ClusterInvoker,要么是接口級,要么應(yīng)用級
一開始構(gòu)造出來的 MigrationInvoker 對象中三個屬性都為空,接下來會利用 MigrationRuleListener 來處理 MigrationInvoker 對象,也就是給這三個屬性賦值。
在 MigrationRuleListener 的構(gòu)造方法中,會從配置中心讀取 DUBBO_SERVICEDISCOVERY_MIGRATION 組下面的"當(dāng)前應(yīng)用名+.migration"的配置項(xiàng),配置項(xiàng)為 yml 格式,對應(yīng)的對象為 MigrationRule,也就是可以配置具體的遷移規(guī)則,比如:某個接口或某個應(yīng)用的 MigrationStep(FORCE_INTERFACE、APPLICATION_FIRST、FORCE_APPLICATION),還可以配置 threshold,表示一個閾值,比如:配置為 2,表示應(yīng)用級 Invoker 數(shù)量是接口級 Invoker 數(shù)量的兩倍時才使用應(yīng)用級 Invoker,不然就使用接口級數(shù)量,可以參考:https://cn.dubbo.apache.org/zh/docs/advanced/migration-invoker/
如果沒有配置遷移規(guī)則,則會看當(dāng)前應(yīng)用中是否配置了 migration.step,如果沒有,那就從全局配置中心讀取 dubbo.application.service-discovery.migration 來獲取 MigrationStep,如果也沒有配置,那 MigrationStep 默認(rèn)為 APPLICATION_FIRST
如果沒有配置遷移規(guī)則,則會看當(dāng)前應(yīng)用中是否配置了 migration.threshold,如果沒有配,則 threshold 默認(rèn)為 -1。
在應(yīng)用中可以這么配置:
dubbo:
application:
name: dubbo-springboot-demo-consumer
parameters:
migration.step: FORCE_APPLICATION
migration.threshold: 2
確定了 step 和 threshold 之后,就要真正開始給 MigrationInvoker 對象中的三個屬性賦值了,先根據(jù) step 調(diào)用不同的方法
switch (step) {
case APPLICATION_FIRST:
// 先進(jìn)行接口級服務(wù)引入得到對應(yīng)的ClusterInvoker,并賦值給invoker屬性
// 再進(jìn)行應(yīng)用級服務(wù)引入得到對應(yīng)的ClusterInvoker,并賦值給serviceDiscoveryInvoker屬性
// 再根據(jù)兩者的數(shù)量判斷到底用哪個,并且把確定的ClusterInvoker賦值給currentAvailableInvoker屬性
migrationInvoker.migrateToApplicationFirstInvoker(newRule);
break;
case FORCE_APPLICATION:
// 只進(jìn)行應(yīng)用級服務(wù)引入得到對應(yīng)的ClusterInvoker,并賦值給serviceDiscoveryInvoker和currentAvailableInvoker屬性
success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
break;
case FORCE_INTERFACE:
default:
// 只進(jìn)行接口級服務(wù)引入得到對應(yīng)的ClusterInvoker,并賦值給invoker和currentAvailableInvoker屬性
success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
}
這里只需要分析當(dāng) step 為 APPLICATION_FIRST 時,是如何確定最終要使用的 ClusterInvoker 的。
得到了接口級 ClusterInvoker 和應(yīng)用級 ClusterInvoker 之后,就會利用 DefaultMigrationAddressComparator 來進(jìn)行判斷:
1.如果應(yīng)用級 ClusterInvoker 中沒有具體的 Invoker,那就表示只能用接口級 Invoker
2.如果接口級 ClusterInvoker 中沒有具體的 Invoker,那就表示只能用應(yīng)用級 Invoker
3.如果應(yīng)用級 ClusterInvoker 和接口級 ClusterInvoker 中都有具體的 Invoker,則獲取對應(yīng)的 Invoker 個數(shù)
4.如果在遷移規(guī)則和應(yīng)用參數(shù)中都沒有配置 threshold,那就讀取全局配置中心的 dubbo.application.migration.threshold 參數(shù),如果也沒有配置,則threshold 默認(rèn)為 0(不是-1了)
5.用應(yīng)用級 Invoker 數(shù)量 / 接口級 Invoker 數(shù)量,得到的結(jié)果如果大于等于 threshold,那就用應(yīng)用級 ClusterInvoker,否則用接口級 ClusterInvoker
threshold 默認(rèn)為 0,那就表示在既有應(yīng)用級 Invoker 又有接口級 Invoker 的情況下,就一定會用應(yīng)用級 Invoker,兩個正數(shù)相除,結(jié)果肯定為正數(shù),當(dāng)然你自己可以控制 threshold,如果既有既有應(yīng)用級 Invoker 又有接口級 Invoker 的情況下,你想在應(yīng)用級 Invoker 的個數(shù)大于接口級 Invoker 的個數(shù)時采用應(yīng)用級Invoker,那就可以把 threshold 設(shè)置為 1,表示個數(shù)相等,或者個數(shù)相除之后的結(jié)果大于 1 時用應(yīng)用級 Invoker,否者用接口級 Invoker
這樣 MigrationInvoker 對象中的三個數(shù)據(jù)就能確定好值了,和在最終的接口代理對象執(zhí)行某個方法時,就會調(diào)用 MigrationInvoker 對象的 invoke,在這個invoke 方法中會直接執(zhí)行 currentAvailableInvoker 對應(yīng)的 invoker 的 invoker 方法,從而進(jìn)入到了接口級 ClusterInvoker 或應(yīng)用級 ClusterInvoker 中,從而進(jìn)行負(fù)載均衡,選擇出具體的 DubboInvoer 或 TripleInvoker,完成真正的服務(wù)調(diào)用。
服務(wù)調(diào)用底層原理
在 Dubbo2.7 中,默認(rèn)的是 Dubbo 協(xié)議,因?yàn)?Dubbo 協(xié)議相比較于 Http1.1 而言,Dubbo 協(xié)議性能上是要更好的。
但是 Dubbo 協(xié)議自己的缺點(diǎn)就是不通用,假如現(xiàn)在通過 Dubbo 協(xié)議提供了一個服務(wù),那如果想要調(diào)用該服務(wù)就必須要求服務(wù)消費(fèi)者也要支持 Dubbo 協(xié)議,比如想通過瀏覽器直接調(diào)用 Dubbo 服務(wù)是不行的,想通過 Nginx 調(diào) Dubbo 服務(wù)也是不行得。
而隨著企業(yè)的發(fā)展,往往可能會出現(xiàn)公司內(nèi)部使用多種技術(shù)棧,可能這個部門使用 Dubbo,另外一個部門使用 Spring Cloud,另外一個部門使用 gRPC,那此時部門之間要想相互調(diào)用服務(wù)就比較復(fù)雜了,所以需要一個通用的、性能也好的協(xié)議,這就是 Triple 協(xié)議。
Triple 協(xié)議是基于 Http2 協(xié)議的,也就是在使用 Triple 協(xié)議發(fā)送數(shù)據(jù)時,會按 HTTP2 協(xié)議的格式來發(fā)送數(shù)據(jù),而 HTTP2 協(xié)議相比較于 HTTP1 協(xié)議而言,HTTP2是 HTTP1 的升級版,完全兼容 HTTP1,而且 HTTP2 協(xié)議從設(shè)計(jì)層面就解決了 HTTP1 性能低的問題。
另外,Google 公司開發(fā)的 gRPC,也基于的 HTTP2,目前 gRPC 是云原生事實(shí)上協(xié)議標(biāo)準(zhǔn),包括 k8s/etcd 等都支持 gRPC 協(xié)議。
所以 Dubbo3.0 為了能夠更方便的和 k8s 進(jìn)行通信,在實(shí)現(xiàn) Triple 的時候也兼容了 gRPC,也就是可以用 gPRC 的客戶端調(diào)用 Dubbo3.0 所提供的 triple 服務(wù),也可以用 triple 服務(wù)調(diào)用 gRPC 的服務(wù)。
Triple 的底層原理分析
就是因?yàn)?HTTP2 中的數(shù)據(jù)幀機(jī)制,Triple 協(xié)議才能支持 UNARY、SERVER_STREAM、BI_STREAM 三種模式。
1.UNARY:就是最普通的,服務(wù)端只有在接收到完請求包括的所有的 HEADERS 幀和 DATA 幀之后(通過調(diào)用 onCompleted() 發(fā)送最后一個 DATA 幀),才會處理數(shù)據(jù),客戶端也只有接收完響應(yīng)包括的所有的 HEADERS 幀和 DATA 幀之后,才會處理響應(yīng)結(jié)果。
2.SERVER_STREAM:服務(wù)端流,特殊的地方在于,服務(wù)端在接收完請求包括的所有的 DATA 幀之后,才會處理數(shù)據(jù),不過在處理數(shù)據(jù)的過程中,可以多次發(fā)送響應(yīng) DATA 幀(第一個 DATA 幀發(fā)送之前會發(fā)送一個 HEADERS 幀),客戶端每接收到一個響應(yīng) DATA 幀就可以直接處理該響應(yīng) DATA 幀,這個模式下,客戶端只能發(fā)一次數(shù)據(jù),但能多次處理響應(yīng) DATA 幀。
3.BI_STREAM:雙端流,或者客戶端流,特殊的地方在于,客戶端可以控制發(fā)送多個請求 DATA 幀(第一個 DATA 幀發(fā)送之前會發(fā)送一個 HEADERS 幀),服務(wù)端會不斷的接收到請求 DATA 幀并進(jìn)行處理,并且及時的把處理結(jié)果作為響應(yīng) DATA 幀發(fā)送給客戶端(第一個 DATA 幀發(fā)送之前會發(fā)送一個 HEADERS 幀),而客戶端每接收到一個響應(yīng)結(jié)果 DATA 幀也會直接處理,這種模式下,客戶端和服務(wù)端都在不斷的接收和發(fā)送 DATA 幀并進(jìn)行處理,注意請求 HEADER 幀和響應(yīng) HEADERS 幀都只發(fā)了一個。
Triple 請求調(diào)用和響應(yīng)處理
創(chuàng)建一個 Stream 的前提是先得有一個 Socket 連接,所以我們得先知道 Socket 連接是在哪創(chuàng)建的。
在服務(wù)提供者進(jìn)行服務(wù)導(dǎo)出時,會按照協(xié)議以及對應(yīng)的端口啟動 Server,比如:Triple 協(xié)議就會啟動 Netty 并綁定指定的端口,等待 Socket 連接,在進(jìn)行服務(wù)消費(fèi)者進(jìn)行服務(wù)引入的過程中,會生成 TripleInvoker 對象,在構(gòu)造 TripleInvoker 對象的構(gòu)造方法中,會利用 ConnectionManager 創(chuàng)建一個 Connection 對象,而Connection 對象中包含了一個 Bootstrap 對象(Netty 中用來建立 Socket 連接的),不過以上都只是創(chuàng)建對象,并不會真正和服務(wù)去建立 Socket 連接,所以在生成 TripleInvoker 對象過程中不會真正去創(chuàng)建 Socket 連接,那什么時候創(chuàng)建的呢?
當(dāng)我們在服務(wù)消費(fèi)端執(zhí)行以下代碼時:demoService.sayHello("habit")
demoService 是一個代理對象,在執(zhí)行方法的過程中,最終會調(diào)用 TripleInvoker 的 doInvoke() 方法,在 doInvoke() 方法中,會利用 Connection 對象來判斷Socket 連接是否可用,如果不可用并且沒有初始化,那就會創(chuàng)建 Socket 連接。
一個 Connection 對象就表示一個 Socket 連接,在 TripleInvoker 對象中也只有一個 Connection 對象,也就是一個 TripleInvoker 對象只對應(yīng)一個 Socket 連接,這個和 DubboInvoker 不太一樣,一個 DubboInvoker 中可以有多個 ExchangeClient,每個 ExchangeClient 都會與服務(wù)端創(chuàng)建一個 Socket 連接,所以一個DubboInvoker 可以對應(yīng)多個 Socket 連接,當(dāng)然多個 Socket 連接的目的就是提高并發(fā),不過在 TripleInvoker 對象中就不需要這么來設(shè)計(jì)了,因?yàn)榭梢?Stream機(jī)制來提高并發(fā)。
以上,我們知道了,當(dāng)我們利用服務(wù)接口的代理對象執(zhí)行方法時就會創(chuàng)建一個 Socket 連接,就算這個代理對象再次執(zhí)行方法時也不會再次創(chuàng)建 Socket 連接了,值得注意的是,有可能兩個服務(wù)接口對應(yīng)的是一個 Socket 連接,舉個例子。
比如服務(wù)提供者應(yīng)用 A,提供了 DemoService 和 HelloService 兩個服務(wù),服務(wù)消費(fèi)者應(yīng)用 B 引入了這兩個服務(wù),那么在服務(wù)消費(fèi)者這端,這個兩個接口對應(yīng)的代理對象對應(yīng)的 TripleInvoker 是不同的兩個,但是這兩個 TripleInvoker 會公用一個 Socket 連接,因?yàn)?ConnectionManager 在創(chuàng)建 Connection 對象時會根據(jù)服務(wù) URL 的 address 進(jìn)行緩存,后續(xù)這兩個代理對象在執(zhí)行方法時使用的就是同一個 Socket 連接,但是是不同的 Stream。
Socket 連接創(chuàng)建好之后,就需要發(fā)送 Invocation 對象給服務(wù)提供者了,因?yàn)槭腔诘?HTTP2,所以要先創(chuàng)建一個 Stream,然后再通過 Stream 來發(fā)送數(shù)據(jù)。
TripleInvoker 中用的是 Netty,所以最終會利用 Netty 來創(chuàng)建 Stream,對應(yīng)的對象為 Http2StreamChannel,消費(fèi)端的 TripleInvoker 最終會利用Http2StreamChannel 來發(fā)送和接收數(shù)據(jù)幀,數(shù)據(jù)幀對應(yīng)的對象為 Http2Frame,它又分為 Http2DataFrame、Http2HeadersFrame 等具體類型。
正常情況下,會每生成一個數(shù)據(jù)幀就會通過 Http2StreamChannel 發(fā)送出去,但是在 Triple 中有一個小小的優(yōu)化,會有一個批量發(fā)送的思想,當(dāng)要發(fā)送一個數(shù)據(jù)幀時,會先把數(shù)據(jù)幀放入一個 WriteQueue 中,然后會從線程池中拿到一個線程調(diào)用 WriteQueue 的 flush 方法,該方法的實(shí)現(xiàn)為:
private void flush() {
try {
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
// 只要隊(duì)列中有元素就取出來,沒有則退出while
while ((cmd = queue.poll()) != null) {
// 把數(shù)據(jù)幀添加到Http2StreamChannel中,添加并不會立馬發(fā)送,調(diào)用了flush才發(fā)送
cmd.run(channel);
i++;
// DEQUE_CHUNK_SIZE=128
// 連續(xù)從隊(duì)列中取到了128個數(shù)據(jù)幀就flush一次
if (i == DEQUE_CHUNK_SIZE) {
i = 0;
channel.flush();
flushedOnce = true;
}
}
// i != 0 表示從隊(duì)列中取到了數(shù)據(jù)但是沒滿128個
// 如果i=0,flushedOnce=false也flush一次
if (i != 0 || !flushedOnce) {
channel.flush();
}
} finally {
scheduled.set(false);
// 如果隊(duì)列中又有數(shù)據(jù)了,則繼續(xù)會遞歸調(diào)用flush
if (!queue.isEmpty()) {
scheduleFlush();
}
}
}
總體思想是,只要向 WriteQueue 中添加一個數(shù)據(jù)幀之后,那就會嘗試開啟一個線程,要不要開啟線程要看 CAS,比如現(xiàn)在有 10 個線程同時向 WriteQueue 中添加了一個數(shù)據(jù)幀,那么這 10 個線程中的某一個會 CAS 成功,其他會 CAS 失敗,那么此時 CAS 成功的線程會負(fù)責(zé)從線程池中獲取另外一個線程執(zhí)行上面的 flush 方法,從而獲取 WriteQueue 中的數(shù)據(jù)幀然后發(fā)送出去。
有了底層這套設(shè)計(jì)之后,對于 TripleInvoker 而 ,它只需要把要發(fā)送的數(shù)據(jù)封裝為數(shù)據(jù)幀,然后添加到 WriteQueue 中就可以了。
在 TripleInvoker 的 doInvoke() 源碼中,在創(chuàng)建完成 Socket 連接后,就會:
1.基于 Socket 連接先構(gòu)造一個 ClientCall 對象
2.根據(jù)當(dāng)前調(diào)用的方法信息構(gòu)造一個 RequestMetadata 對象,這個對象表示,當(dāng)前調(diào)用的是哪個接口的哪個方法,并且記錄了所配置的序列化方式,壓縮方式,超時時間等
3.緊接著構(gòu)造一個 ClientCall.Listener,這個 Listener 是用來處理響應(yīng)結(jié)果的,針對不同的流式調(diào)用類型,會構(gòu)造出不同的 ClientCall.Listener:UNARY:會構(gòu)造出一個 UnaryClientCallListener,內(nèi)部包含了一個 DeadlineFuture,DeadlineFuture 是用來控制 timeout 的SERVER_STREAM:會構(gòu)造出一個 ObserverToClientCallListenerAdapter,內(nèi)部包含了調(diào)用方法時傳入進(jìn)來的 StreamObserver 對象,最終就是由這個StreamObserver 對象來處理響應(yīng)結(jié)果的BI_STREAM:和 SERVER_STREAM 一樣,也會構(gòu)造出來一個 ObserverToClientCallListenerAdapter
4.緊著著,就會調(diào)用 ClientCall 對象的 start 方法創(chuàng)建一個 Stream,并且返回一個 StreamObserver 對象
5.得到了 StreamObserver 對象后,會根據(jù)不同的流式調(diào)用類型來使用這個 StreamObserver 對象UNARY:直接調(diào)用 StreamObserver 對象的 onNext() 方法來發(fā)送方法參數(shù),然后調(diào)用 onCompleted 方法,然后返回一個 new AsyncRpcResult(future, invocation),future 就是 DeadlineFuture,后續(xù)會通過 DeadlineFuture 同步等待響應(yīng)結(jié)果的到來,并最終把獲取到的響應(yīng)結(jié)果返回給業(yè)務(wù)方法。SERVER_STREAM:直接調(diào)用 StreamObserver 對象的 onNext() 方法來發(fā)送方法參數(shù),然后調(diào)用 onCompleted 方法,然后返回一個 new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation),后續(xù)不會同步了,并且返回 null 給業(yè)務(wù)方法。BI_STREAM:直接返回 new AsyncRpcResult( CompletableFuture.completedFuture(new AppResponse(requestObserver)), invocation),也不同同步等待響應(yīng)結(jié)果了,而是直接把 requestObserver 對象返回給了業(yè)務(wù)方法。所以我們可以發(fā)現(xiàn),不管是哪種流式調(diào)用類型,都會先創(chuàng)建一個 Stream,得到對應(yīng)的一個 StreamObserver 對象,然后調(diào)用 StreamObserver 對象的onNext 方法來發(fā)送數(shù)據(jù),比如發(fā)送服務(wù)接口方法的入?yún)⒅担热缫粋€ User 對象:在發(fā)送 User 對象之前,會先發(fā)送請求頭,請求頭中包含了當(dāng)前調(diào)用的是哪個接口、哪個方法、版本號、序列化方式、壓縮方式等信息,注意請求頭中會包含一些 gRPC 相關(guān)的 key,主要就是為了兼容 gRPC然后就是發(fā)送請求體然后再對 User 對象進(jìn)行序列化,得到字節(jié)數(shù)組然后再壓縮字節(jié)數(shù)組然后把壓縮之后的字節(jié)數(shù)組以及是否壓縮標(biāo)記生成一個 DataQueueCommand 對象,并且把這個對象添加到 writeQueue 中去,然后執(zhí)行scheduleFlush(),該方法就會開啟一個線程從 writeQueue 中獲取數(shù)據(jù)進(jìn)行發(fā)送,發(fā)送時就會觸發(fā) DataQueueCommand 對象的 doSend 方法進(jìn)行發(fā)送,該方法中會構(gòu)造一個 DefaultHttp2DataFrame 對象,該對象中由兩個屬性 endStream,表示是不是 Stream 中的最后一幀,另外一個屬性為content,表示幀攜帶的核心數(shù)據(jù),該數(shù)據(jù)格式為:第一個字節(jié)記錄請求體是否被壓縮緊著的四個字節(jié)記錄字節(jié)數(shù)組的長度后面就真正的字節(jié)數(shù)據(jù)以上是 TripleInvoker 發(fā)送數(shù)據(jù)的流程,接下來就是 TripleInvoker 接收響應(yīng)數(shù)據(jù)的流程,ClientCall.Listener 就是用來監(jiān)聽是否接收到的響應(yīng)數(shù)據(jù)的,不同的流式調(diào)用方式會對應(yīng)不同的 ClientCall.Listener:UNARY:UnaryClientCallListener,內(nèi)部包含了一個 DeadlineFuture,DeadlineFuture 是用來控制 timeout 的SERVER_STREAM:ObserverToClientCallListenerAdapter,內(nèi)部包含了調(diào)用方法時傳入進(jìn)來的 StreamObserver 對象,最終就是由這個StreamObserver 對象來處理響應(yīng)結(jié)果的BI_STREAM:和 SERVER_STREAM 一樣,也會構(gòu)造出來一個 ObserverToClientCallListenerAdapter那現(xiàn)在要了解的就是,如何知道某個 Stream 中有響應(yīng)數(shù)據(jù),然后觸發(fā)調(diào)用 ClientCall.Listener 對象的相應(yīng)的方法。要監(jiān)聽某個 Stream 中是否有響應(yīng)數(shù)據(jù),這個肯定是 Netty 來做的,實(shí)際上,在之前創(chuàng)建 Stream 時,會向 Http2StreamChannel 綁定一個TripleHttp2ClientResponseHandler,很明顯這個 Handler 就是用來處理接收到的響應(yīng)數(shù)據(jù)的。在 TripleHttp2ClientResponseHandler 的 channelRead0 方法中,每接收一個響應(yīng)數(shù)據(jù)就會判斷是 Http2HeadersFrame 還是 Http2DataFrame,然后調(diào)用 ClientTransportListener 中對應(yīng)的 onHeader 方法和 onData 方法:onHeader 方法通過處理響應(yīng)頭,會生成一個 TriDecoder,它是用來解壓并處理響應(yīng)體的onData 方法會利用 TriDecoder 的 deframe() 方法來處理響應(yīng)體另外如果服務(wù)提供者那邊調(diào)用了 onCompleted 方法,會向客戶端響應(yīng)一個請求頭,endStream 為 true,表示響應(yīng)結(jié)束,也會觸發(fā)執(zhí)行 onHeader 方法,從而會調(diào)用 TriDecoder 的 close() 方法.TriDecoder 的 deframe() 方法在處理響應(yīng)體數(shù)據(jù)時,會分為兩個步驟:先解析前 5 個字節(jié),先解析第 1 個字節(jié)確定該響應(yīng)體是否壓縮了,再解析后續(xù) 4 個字節(jié)確定響應(yīng)體內(nèi)容的字節(jié)長度然后再取出該長度的字節(jié)作為響應(yīng)體數(shù)據(jù),如果壓縮了,那就進(jìn)行解壓,然后把解壓之后的字節(jié)數(shù)組傳遞給 ClientStreamListenerImpl 的onMessage() 方法,該方法就會按對應(yīng)的序列化方式進(jìn)行反序列化,得到最終的對象,然后再調(diào)用到最終的 UnaryClientCallListener 或者ObserverToClientCallListenerAdapter 的 onMessage() 方法。TriDecoder 的 close() 方法最終也會調(diào)用到 UnaryClientCallListener 或者 ObserverToClientCallListenerAdapter 的 close() 方法。UnaryClientCallListener,構(gòu)造它時傳遞了一個 DeadlineFuture 對象:onMessage() 接收到響應(yīng)結(jié)果對象后,會把結(jié)果對象賦值給 appResponse 屬性onClose() 會取出 appResponse 屬性記錄的結(jié)果對象構(gòu)造出來一個 AppResponse 對象,然后調(diào)用 DeadlineFuture 的 received 方法,從而將方法調(diào)用線程接阻塞,并得到響應(yīng)結(jié)果對象。ObserverToClientCallListenerAdapter,構(gòu)造它時傳遞了一個 StreamObserver 對象:onMessage() 接收到響應(yīng)結(jié)果對象后,會調(diào)用 StreamObserver 對象的 onNext(),并把結(jié)果對象傳給 onNext() 方法,從觸發(fā)了程序員的 onNext() 方法邏輯。onClose() 就會調(diào)用 StreamObserver 對象的 onCompleted(),或者調(diào)用 onError() 方法
Triple 請求處理和響應(yīng)結(jié)果發(fā)送
其實(shí)這部分內(nèi)容和發(fā)送請求和處理響應(yīng)是非常類似的,無非就是把視角從消費(fèi)端切換到服務(wù)端,前面分析的是消費(fèi)端發(fā)送和接收數(shù)據(jù),現(xiàn)在要分析的是服務(wù)端接收和發(fā)送數(shù)據(jù)。
消費(fèi)端在創(chuàng)建一個 Stream 后,會生成一個對應(yīng)的 StreamObserver 對象用來發(fā)送數(shù)據(jù)和一個 ClientCall.Listener 用來接收響應(yīng)數(shù)據(jù),對于服務(wù)端其實(shí)也一樣,在接收到消費(fèi)端創(chuàng)建 Stream 的命令后,也需要生成一個對應(yīng)的 StreamObserver 對象用來響應(yīng)數(shù)據(jù)以及一個 ServerCall.Listener 用來接收請求數(shù)據(jù)。
在服務(wù)導(dǎo)出時,TripleProtocol 的 export 方法中會開啟一個 ServerBootstrap,并綁定指定的端口,并且最重要的是,Netty 會負(fù)責(zé)接收創(chuàng)建 Stream 的信息,一旦就收到這個信號,就會生成一個 ChannelPipeline,并給 ChannelPipeline 綁定一個 TripleHttp2FrameServerHandler,而這個TripleHttp2FrameServerHandler 就可以用來處理 Http2HeadersFrame 和 Http2DataFrame。
比如在接收到請求頭后,會構(gòu)造一個 ServerStream 對象,該對象有一個 ServerTransportObserver 對象,ServerTransportObserver 對象就會真正來處理請求頭和請求體:
1.onHeader() 方法,用來處理請求頭比如從請求頭中得到當(dāng)前請求調(diào)用的是哪個服務(wù)接口,哪個方法構(gòu)造一個 TriDecoder 對象,TriDecoder 對象用來處理請求體構(gòu)造一個 ReflectionServerCall 對象并調(diào)用它的 doStartCall() 方法,從而生成不同的 ServerCall.ListenerUNARY:UnaryServerCallListenerSERVER_STREAM:ServerStreamServerCallListenerBI_STREAM:BiStreamServerCallListener并且在構(gòu)造這些 ServerCall.Listener 時會把 ReflectionServerCall 對象傳入進(jìn)去,ReflectionServerCall 對象可以用來向客戶端發(fā)送數(shù)據(jù)
2.onData() 方法,用來處理請求體,調(diào)用 TriDecoder 對象的 deframe 方法來處理請求體,如果是 endStream,那還會調(diào)用 TriDecoder 對象的 close 方法
TriDecoder:
deframe():這個方法的作用和客戶端時一樣的,都是先解析請求體的前 5 個字節(jié),然后解壓請全體,然后反序列化得到請求參數(shù)對象,然后調(diào)用不同的ServerCall.Listener 中的 onMessage()
close():當(dāng)客戶端調(diào)用 onCompleted 方法時,就表示發(fā)送數(shù)據(jù)完畢,此時會發(fā)送一個 DefaultHttp2DataFrame 并且 endStream 為 true,從而會觸發(fā)服務(wù)端TriDecoder 對象的 close() 方法,從而調(diào)用不同的 ServerCall.Listener 中的 onComplete()
UnaryServerCallListener:
1.在接收到請求頭時,會構(gòu)造 UnaryServerCallListener 對象,沒什么特殊的
2.然后接收到請求體時,請求體中的數(shù)據(jù)就是調(diào)用接口方法的入?yún)⒅?,比?User 對象,那么就會調(diào)用 UnaryServerCallListener 的 onMessage() 方法,在這個方法中會把 User 對象設(shè)置到 invocation 對象中
3.當(dāng)消費(fèi)端調(diào)用 onCompleted() 方法,表示請求體數(shù)據(jù)發(fā)送完畢,從而觸發(fā) UnaryServerCallListener 的 onComplete() 方法,在該方法中會調(diào)用 invoke() 方法,從而執(zhí)行服務(wù)方法,并得到結(jié)果,得到結(jié)果后,會調(diào)用 UnaryServerCallListener 的 onReturn() 方法,把結(jié)果通過 responseObserver 發(fā)送給消費(fèi)端,并調(diào)用 responseObserver 的 onCompleted() 方法,表示響應(yīng)數(shù)據(jù)發(fā)送完畢,responseObserver 是 ReflectionServerCall 對象的一個 StreamObserver 適配對象(ServerCallToObserverAdapter)。
再來看 ServerStreamServerCallListener:
1.在接收到請求頭時,會構(gòu)造 ServerStreamServerCallListener 對象,沒什么特殊的
2.然后接收到請求體時,請求體中的數(shù)據(jù)就是調(diào)用接口方法的入?yún)⒅?,比?User 對象,那么就會調(diào)用 ServerStreamServerCallListener 的 onMessage() 方法,在這個方法中會把 User 對象以及 responseObserver 對象設(shè)置到 invocation 對象中,這是和 UnaryServerCallListener 不同的地方,UnaryServerCallListener 只會把 User 對象設(shè)置給 invocation,而 ServerStreamServerCallListener 還會把 responseObserver 對象設(shè)置進(jìn)去,因?yàn)榉?wù) 端流需要這個 responseObserver 對象,服務(wù)方法拿到這個對象后就可以自己來控制如何發(fā)送響應(yīng)體,并什么時候調(diào)用 onCompleted() 方法來表示響應(yīng)體發(fā)送完畢。
3.當(dāng)消費(fèi)端調(diào)用 onCompleted() 方法,表示請求體數(shù)據(jù)發(fā)送完畢,從而觸發(fā) ServerStreamServerCallListener 的 onComplete() 方法,在該方法中會調(diào)用invoke() 方法,從而執(zhí)行服務(wù)方法,從而會通過 responseObserver 對象來發(fā)送數(shù)據(jù)
4.方法執(zhí)行完后,仍然會調(diào)用 ServerStreamServerCallListener的onReturn() 方法,但是個空方法
再來看最后一個 BiStreamServerCallListener:
1.在接收到請求頭時,會構(gòu)造 BiStreamServerCallListener 對象,這里比較特殊,會把 responseObserver 設(shè)置給 invocation 并執(zhí)行 invoke() 方法,從而執(zhí)行服務(wù)方法,并執(zhí)行 onReturn() 方法,onReturn() 方法中會把服務(wù)方法的執(zhí)行結(jié)果,也是一個 StreamObserver 對象,賦值給 BiStreamServerCallListener 對象的requestObserver 屬性。
2.這樣,在接收到請求頭時,服務(wù)方法就會執(zhí)行了,并且得到了一個 requestObserver,它是程序員定義的,是用來處理請求體的,另外的 responseObserver是用來發(fā)送響應(yīng)體的。
3.緊接著就會收到請求體,從而觸發(fā) onMessage() 方法,該方法中會調(diào)用 requestObserver 的 onNext() 方法,這樣就可以做到,服務(wù)端能實(shí)時的接收到消費(fèi)端每次所發(fā)送過來的數(shù)據(jù),并且進(jìn)行處理,處理過程中,如果需要響應(yīng)就可以利用 responseObserver 進(jìn)行響應(yīng)
4.一旦消費(fèi)端那邊調(diào)用了 onCompleted() 方法,那么就會觸發(fā) BiStreamServerCallListener 的 onComplete 方法,該方法中也就是調(diào)用 requestObserver 的onCompleted(),主要就觸發(fā)程序員自己寫的 StreamObserver 對象中的 onCompleted(),并沒有針對底層的 Stream 做什么事情。
總結(jié)
不管是 Unary,還是 ServerStream,還是 BiStream,底層客戶端和服務(wù)端之前都只有一個 Stream,它們?nèi)叩膮^(qū)別在于:
1.Unary:通過流,將方法入?yún)⒅底鳛檎埱篌w發(fā)送出去,而且只發(fā)送一次,服務(wù)端這邊接收到請求體之后,會執(zhí)行服務(wù)方法,得到結(jié)果,把結(jié)果返回給客戶端,也只響應(yīng)一次。
2.ServerStream:通過流,將方法入?yún)⒅底鳛檎埱篌w發(fā)送出去,而且只發(fā)送一次,服務(wù)端這邊接收到請求體之后,會執(zhí)行服務(wù)方法,并且會把當(dāng)前流對應(yīng)的StreamObserver 對象也傳給服務(wù)方法,由服務(wù)方法自己控制如何響應(yīng),響應(yīng)幾次,響應(yīng)什么數(shù)據(jù),什么時候響應(yīng)結(jié)束,都由服務(wù)方法自己控制。
3.BiStream,通過流,客戶端和服務(wù)端,都可以發(fā)送和響應(yīng)多次。
審核編輯 黃宇
-
URL
+關(guān)注
關(guān)注
0文章
139瀏覽量
15340 -
Dubbo
+關(guān)注
關(guān)注
0文章
20瀏覽量
3180
發(fā)布評論請先 登錄
相關(guān)推薦
評論