開(kāi)發(fā)環(huán)境:Ubuntu VS Code
編譯器:g++
框架源碼下載:GitHub
認(rèn)識(shí)RPC
RPC的全稱是遠(yuǎn)程過(guò)程調(diào)用(Remote Procedure Call)。
什么是遠(yuǎn)程過(guò)程調(diào)用呢?
那么對(duì)于一個(gè)聊天系統(tǒng)有int send_information(int friend_id,string msg)這個(gè)方法,我們的一個(gè)處理邏輯是不是這樣:
- 調(diào)用bool is_exist(int friend_id)判斷用戶是否在線
- 根據(jù)結(jié)果在決議是發(fā)送在線消息還是離線消息。
那么對(duì)于一個(gè)繼承了登錄和聊天功能的系統(tǒng),我們?cè)诒镜卣{(diào)用一個(gè)函數(shù),就直接返回值=函數(shù)名(參數(shù)1,參數(shù)2),就直接執(zhí)行了這個(gè)過(guò)程并且得到了返回值。
但是如果考慮高并發(fā)、高可用以及系統(tǒng)擴(kuò)展性的話,那我們不得不引入分布式的設(shè)計(jì)。這意味這,登錄和聊天會(huì)部署在不同的機(jī)器上!那么要完成上面的邏輯,就不得不依靠網(wǎng)絡(luò),將要調(diào)用的函數(shù)名以及參數(shù)通過(guò)網(wǎng)絡(luò)打包發(fā)送給另一個(gè)機(jī)器,然后由另一臺(tái)機(jī)器將結(jié)果發(fā)過(guò)來(lái)。
而我們要做的RPC框架就是為使這個(gè)過(guò)程更好用而設(shè)計(jì)的。
RPC框架的設(shè)計(jì)
博文的標(biāo)題是基于 protobuf 和 zookeeper 的RPC框架,那么protobuf 和 zookeeper又在整個(gè)框架啊中扮演怎樣的角色呢?
protobuf 作用
protobuf 主要是作為整個(gè)框架的傳輸協(xié)議。我們可以看一下整個(gè)框架對(duì)于傳輸信息的格式定義:
{
bytes service_name = 1; //類名
bytes method_name = 2; //方法名
uint32 args_size = 3; //參數(shù)大小
}
可以看到,它定義了要調(diào)用方法是屬于哪個(gè)類的哪個(gè)方法以及這個(gè)方法所需要的的參數(shù)大小。
那么參數(shù)呢?是怎樣定義的?
首先我們來(lái)看一下我們框架內(nèi)傳輸?shù)臄?shù)據(jù)是什么:4字節(jié)標(biāo)識(shí)頭部長(zhǎng)度+RpcHeader+args
* 16表示整個(gè)類名+方法名+參數(shù)長(zhǎng)度的大小
有個(gè)這個(gè)長(zhǎng)度,我們就可以從整個(gè)長(zhǎng)度中截取UserServiceLogin15這一段
再根據(jù)RpcHeader來(lái)反序列話得出類名和方法名以及參數(shù)長(zhǎng)度三個(gè)重要數(shù)據(jù)
* 15表示后面的參數(shù)長(zhǎng)度
由于我們找到了類名和方法名,我們就可以在整個(gè)框架存儲(chǔ)這些信息的地方得到一個(gè)對(duì)于這個(gè)方法的描述。
然后借用protobuf的service對(duì)象提供的一個(gè)接口GetResponsePrototype,并且根據(jù)這個(gè)方法的描述來(lái)反序列化出參數(shù)。這個(gè)都是根據(jù)我們注冊(cè)的方法的描述來(lái)做的。
*/
18UserServiceLogin15zhang san123456
注:如果還是有點(diǎn)迷糊,可以保留參數(shù)解釋信息,看到后面就大致懂了
zookeeper
zookeeper 呢在這里其實(shí)主要就是起到了一個(gè)配置中心的目的。
什么意思呢?
zookeeper上面我們標(biāo)識(shí)了每個(gè)類的方法所對(duì)應(yīng)的分布式節(jié)點(diǎn)地址,當(dāng)我們其他服務(wù)器想要RPC的時(shí)候,就去 zookeeper 上面查詢對(duì)應(yīng)要調(diào)用的服務(wù)在哪個(gè)節(jié)點(diǎn)上。
注意:這里就相當(dāng)于,我其他服務(wù)器來(lái) zookeeper 查詢User::is_exist,然后會(huì)得到127.0.0.1:8001 這個(gè)值,這個(gè)值就是你布置這個(gè)功能的一個(gè)RPC節(jié)點(diǎn)的網(wǎng)絡(luò)標(biāo)識(shí)符,然后向這個(gè)節(jié)點(diǎn)去發(fā)送參數(shù)并且等待這個(gè)節(jié)點(diǎn)的相應(yīng)。
從框架的使用來(lái)認(rèn)識(shí)
框架的使用一般都是在example目錄下的 callee/UserService.cpp 里面
//框架服務(wù)提供provider
RpcProvider provide;
provide.notify_service(new UserService());
provide.run();
可以看到,主要去做了三個(gè)事情:
- 首先 RPC 框架肯定是部署到一臺(tái)服務(wù)器上的,所以我們需要對(duì)這個(gè)服務(wù)器的 ip 和 port 進(jìn)行初始化
- 然后創(chuàng)建一個(gè) porvider(也就是server)對(duì)象,將當(dāng)前 UserService 這個(gè)對(duì)象傳遞給他,也就是其實(shí)這個(gè) RPC 框架和我們執(zhí)行具體業(yè)務(wù)的節(jié)點(diǎn)是在同一個(gè)服務(wù)器上的。RPC框架負(fù)責(zé)解析其他服務(wù)器傳遞過(guò)來(lái)的請(qǐng)求,然后將這些參數(shù)傳遞給本地的方法。并將返回值返回給其他服務(wù)器。
- 最后是去讓這個(gè) provider 去 run 起來(lái)。具體我們其實(shí)可以看一下源代碼:
void RpcProvider::run()
{
//獲取ip和port
string ip = RpcApplication::get_instance().get_configure().find_load("rpcserver_ip");
uint16_t port = atoi(RpcApplication::get_instance().get_configure().find_load("rpcserver_port").c_str());
//cout << ip << ":" << port << endl;
InetAddress address(ip, port);
//創(chuàng)建tcpserver對(duì)象
TcpServer server(&eventloop_, address, "RpcProvider");
//綁定鏈接回調(diào)和消息讀寫(xiě)回調(diào)方法
server.setConnectionCallback(bind(&RpcProvider::on_connection, this, _1));
server.setMessageCallback(bind(&RpcProvider::on_message, this, _1, _2, _3));
//設(shè)置muduo庫(kù)的線程數(shù)量
server.setThreadNum(4);
//把當(dāng)前rpc節(jié)點(diǎn)上要發(fā)布的服務(wù)全部注冊(cè)到zk上面,讓rpc client可以從zk上發(fā)現(xiàn)服務(wù)
ZookeeperClient zk_client;
zk_client.start();
//在配置中心中創(chuàng)建節(jié)點(diǎn)
for (auto &sp : service_map_)
{
string service_path = "/" + sp.first;
zk_client.create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.method_map_)
{
string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
//ZOO_EPHEMERAL 表示znode時(shí)候臨時(shí)性節(jié)點(diǎn)
zk_client.create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}
RPC_LOG_INFO("server RpcProvider [ip: %s][port: %d]", ip.c_str(), port);
//啟動(dòng)網(wǎng)絡(luò)服務(wù)
server.start();
eventloop_.loop();
}
可以看到,整個(gè) run 其實(shí)就是干了這么幾件事情:
- 因?yàn)榈讓诱{(diào)用的是muduo網(wǎng)絡(luò)庫(kù),所以這里會(huì)獲取ip地址和端口號(hào),然后初始化網(wǎng)絡(luò)層
- 然后去設(shè)置一個(gè)連接回調(diào)以及發(fā)生讀寫(xiě)事件時(shí)候的回調(diào)函數(shù)(稍后介紹)
- 然后設(shè)置整個(gè) muduo 網(wǎng)絡(luò)庫(kù)工作的線程數(shù)量
- 然后創(chuàng)建zookeeper配置中心,將這些方法的信息以及本機(jī)的IP地址注冊(cè)到zookeeper
- 然后開(kāi)啟本機(jī)服務(wù)器的事件循環(huán),等待其他服務(wù)器的連接
其他服務(wù)器是怎樣調(diào)用的呢
我們看一下 example 目錄下的 caller/CallUserService.cpp 里面是怎樣調(diào)用的。
RpcApplication::init(argc, argv);
//演示調(diào)用遠(yuǎn)程發(fā)布的rpc方法login
ik::UserServiceRpc_Stub stub(new RpcChannel());
//請(qǐng)求參數(shù)和響應(yīng)
ik::LoginRequest request;
request.set_name("zhang san");
request.set_password("123456");
ik::LoginResponse response;
RpcControl control;
//發(fā)起rpc調(diào)用,等待響應(yīng)返回
stub.Login(&control, &request, &response, nullptr);
//rpc調(diào)用完成,讀調(diào)用的結(jié)果
if (response.errmsg().error() == 0)
{
//沒(méi)錯(cuò)誤
cout << "rpc login response: " << response.success() << endl;
}
else
{
//有錯(cuò)誤
cout << "rpc login response errer: " << response.errmsg().error_msg() << endl;
}
同樣,也是有以下幾個(gè)步驟:
- 初始化 RPC 遠(yuǎn)程調(diào)用要連接的服務(wù)器
- 定義一個(gè) UserSeervice 的 stub 樁類,由這個(gè)裝類去調(diào)用Login方法,這個(gè)login方法可以去看一下源碼的定義:
void Login(::google::protobuf::RpcController *controller,
const ::ik::LoginRequest *request,
::ik::LoginResponse *response,
::google::protobuf::Closure *done)
{
//框架給業(yè)務(wù)上報(bào)了請(qǐng)求參數(shù) request,業(yè)務(wù)獲取相應(yīng)數(shù)據(jù)做本地業(yè)務(wù)
string name = request->name();
string password = request->password();
//本地業(yè)務(wù)
bool login_result = Login(name, password);
//把響應(yīng)給調(diào)用方返回
ik::ErrorMsg *errmsg = response->mutable_errmsg();
errmsg->set_error(0);
errmsg->set_error_msg("");
response->set_success(login_result);
//執(zhí)行回調(diào)操作
done->Run();
}
可以看到,Login的 RPC 重載函數(shù)有四個(gè)參數(shù):controller(表示函數(shù)是否出錯(cuò))、request(參數(shù))、response(返回值)、done(回調(diào)函數(shù))
其主要做的也是去圍繞著解析參數(shù),將參數(shù)放入本地調(diào)用的方法,將結(jié)果返回并執(zhí)行回調(diào)函數(shù)。至于這個(gè)回調(diào)函數(shù)則是在服務(wù)端執(zhí)行讀寫(xiě)事件回調(diào)函數(shù)綁定的。
綁定的是如下方法:
{
string response_str;
//序列化
if (response->SerializeToString(&response_str))
{
//發(fā)送序列化的數(shù)據(jù)
conn->send(response_str);
}
else
{
//序列化失敗
RPC_LOG_ERROR("serialize reponse error");
}
//短鏈接
conn->shutdown();
}
它會(huì)將由bind函數(shù)綁定的參數(shù):response_str 發(fā)送回去,然后調(diào)用方的服務(wù)器就會(huì)收到這個(gè)返回值并解析它。
樁類是干嘛的
那么其實(shí)現(xiàn)在來(lái)說(shuō),我們并沒(méi)有看到調(diào)用方是何時(shí)發(fā)送了要調(diào)用方法以及相應(yīng)參數(shù)?
我們還是需要去返回樁類,這個(gè)是由protobuf自動(dòng)去幫你生成的。
public:
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel);
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel,
::google::protobuf::Service::ChannelOwnership ownership);
~UserServiceRpc_Stub();
inline ::google::protobuf::RpcChannel* channel() { return channel_; }
// implements UserServiceRpc ------------------------------------------
void Login(::google::protobuf::RpcController* controller,
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done);
void Register(::google::protobuf::RpcController* controller,
const ::ik::RegisterRequest* request,
::ik::RegisterResponse* response,
::google::protobuf::Closure* done);
private:
::google::protobuf::RpcChannel* channel_;
bool owns_channel_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(UserServiceRpc_Stub);
};
我們?cè)诙x樁類的時(shí)候,會(huì)傳入一個(gè)RpcCannel的指針,這個(gè)綁定到這個(gè)樁類的channel_指針。
當(dāng)我們?nèi)フ{(diào)用這個(gè)樁類的Login方法的時(shí)候,會(huì)去調(diào)用傳遞進(jìn)來(lái)的channel的CallMethod方法:
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
所以,顯而易見(jiàn),我去發(fā)送這些方法啊,參數(shù)啊,都是在CallMethod這個(gè)方法中執(zhí)行的。
那么CallMethod里面執(zhí)行的內(nèi)容對(duì)我們理解RPC調(diào)用體系至關(guān)重要(代碼比較長(zhǎng),可以直接跳過(guò)聽(tīng)結(jié)論):
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)
{
const google::protobuf::ServiceDescriptor *service_des = method->service();
string service_name = service_des->name();
string method_name = method->name();
//獲取參數(shù)的序列化字符串長(zhǎng)度 args_size
int args_size = 0;
string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
//定義rpc的請(qǐng)求header
ikrpc::RpcHeader rpc_header;
rpc_header.set_service_name(service_name);
rpc_header.set_method_name(method_name);
rpc_header.set_args_size(args_size);
uint32_t header_size = 0;
string rpc_header_str;
if (rpc_header.SerializeToString(&rpc_header_str))
{
//序列化成功
header_size = rpc_header_str.size();
}
else
{
//序列化失敗
controller->SetFailed("serialize rpc_header error!");
return;
}
//組織待發(fā)送的rpc請(qǐng)求的字符串
string send_rpc_str;
send_rpc_str.insert(0, string((char *)&header_size, 4)); //header_size
send_rpc_str += rpc_header_str; //rpc_header
send_rpc_str += args_str; //args_str
//使用tcp編程,完成rpc方法的遠(yuǎn)程調(diào)用
int client_fd = socket(AF_INET, SOCK_STREAM, 0);
if (client_fd == -1)
{
close(client_fd);
RPC_LOG_FATAL("create socket error! errno:%d", errno);
}
//獲取ip和port
ZookeeperClient zk_client;
zk_client.start();
string method_path = "/" + service_name + "/" + method_name;
string host_data = zk_client.get_data(method_path.c_str());
if(host_data == "")
{
controller->SetFailed(method_path+" is not exist");
return;
}
int host_index = host_data.find(":");
if(host_index == -1)
{
controller->SetFailed(method_path+" address is invalid!");
return;
}
string ip = host_data.substr(0,host_index);
uint16_t port = atoi(host_data.substr(host_index+1,host_data.size()-host_index).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
{
close(client_fd);
RPC_LOG_FATAL("connet error! errno: %d", errno);
}
// 發(fā)送rpc請(qǐng)求
if (send(client_fd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1)
{
controller->SetFailed("send error! errno: " + errno);
close(client_fd);
return;
}
//接受rpc請(qǐng)求
char recv_buffer[BUFF_SIZE] = {0};
ssize_t recv_size = recv(client_fd, recv_buffer, BUFF_SIZE, 0);
if (recv_size == -1)
{
controller->SetFailed("recv error! errno: " + errno);
close(client_fd);
return;
}
//反序列化響應(yīng)數(shù)據(jù)
//String 因?yàn)橛龅?會(huì)認(rèn)為是字符串結(jié)束,所以用Array
if (!response->ParseFromArray(recv_buffer, recv_size))
{
string recv = recv_buffer;
controller->SetFailed("parse error! response_str: " + recv);
close(client_fd);
return;
}
close(client_fd);
}
- 組織要發(fā)送的 request_str 字符串
- 從zookeeper中拿到服務(wù)端的 ip 和 port,連接服務(wù)端
- 發(fā)送 request_str
- 接受服務(wù)端返回過(guò)來(lái)的 response 字符串并反序列化出結(jié)果
總結(jié)一下RPC流程
-
RPC
+關(guān)注
關(guān)注
0文章
111瀏覽量
11537 -
源碼
+關(guān)注
關(guān)注
8文章
641瀏覽量
29216 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4331瀏覽量
62630 -
遠(yuǎn)程過(guò)程調(diào)用
+關(guān)注
關(guān)注
0文章
2瀏覽量
708
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論