RPC封装

因为跨服踢人,所以要调用Grpc踢人,我们在message.proto中添加踢人消息

1
2
3
4
5
6
7
8
message KickUserReq{
int32 uid = 1;
}

message KickUserRsp{
int32 error = 1;
int32 uid = 2;
}

同时添加服务调用

1
2
3
4
service ChatService {
//...其他服务略去
rpc NotifyKickUser(KickUserReq) returns (KickUserRsp){}
}

编写bat脚本自动生成, start.bat内容如下

1
2
3
4
5
6
7
8
9
10
11
12
@echo off
set PROTOC_PATH=D:\cppsoft\grpc\visualpro\third_party\protobuf\Debug\protoc.exe
set GRPC_PLUGIN_PATH=D:\cppsoft\grpc\visualpro\Debug\grpc_cpp_plugin.exe
set PROTO_FILE=message.proto

echo Generating gRPC code...
%PROTOC_PATH% -I="." --grpc_out="." --plugin=protoc-gen-grpc="%GRPC_PLUGIN_PATH%" "%PROTO_FILE%"

echo Generating C++ code...
%PROTOC_PATH% --cpp_out=. "%PROTO_FILE%"

echo Done.

双击start.bat或者在cmd中执行start.bat也可以

执行后可以发现产生了四个文件

image-20250419114210735

跨服踢人示意图

image-20250419115212041

逻辑编写

StatusServer动态分配

StatusServer中修改动态分配server逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ChatServer StatusServiceImpl::getChatServer() {
std::lock_guard<std::mutex> guard(_server_mtx);
auto minServer = _servers.begin()->second;
auto lock_key = LOCK_COUNT;
auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);
//利用defer解锁
Defer defer2([this, identifier, lock_key]() {
RedisMgr::GetInstance()->releaseLock(lock_key, identifier);
});

auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);
if (count_str.empty()) {
//不存在则默认设置为最大
minServer.con_count = INT_MAX;
}
else {
minServer.con_count = std::stoi(count_str);
}


// 使用范围基于for循环
for ( auto& server : _servers) {

if (server.second.name == minServer.name) {
continue;
}

auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);
if (count_str.empty()) {
server.second.con_count = INT_MAX;
}
else {
server.second.con_count = std::stoi(count_str);
}

if (server.second.con_count < minServer.con_count) {
minServer = server.second;
}
}

return minServer;
}

注意这里用到了另一个分布式锁,用来控制服务器人数记录

1
auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);

ChatServer踢人逻辑

ChatSever中登录逻辑里添加跨服踢人调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
auto uid = root["uid"].asInt();
auto token = root["token"].asString();
std::cout << "user login uid is " << uid << " user token is "
<< token << endl;

Json::Value rtvalue;
Defer defer([this, &rtvalue, session]() {
std::string return_str = rtvalue.toStyledString();
session->Send(return_str, MSG_CHAT_LOGIN_RSP);
});


//从redis获取用户token是否正确
std::string uid_str = std::to_string(uid);
std::string token_key = USERTOKENPREFIX + uid_str;
std::string token_value = "";
bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
if (!success) {
rtvalue["error"] = ErrorCodes::UidInvalid;
return ;
}

if (token_value != token) {
rtvalue["error"] = ErrorCodes::TokenInvalid;
return ;
}

rtvalue["error"] = ErrorCodes::Success;


std::string base_key = USER_BASE_INFO + uid_str;
auto user_info = std::make_shared<UserInfo>();
bool b_base = GetBaseInfo(base_key, uid, user_info);
if (!b_base) {
rtvalue["error"] = ErrorCodes::UidInvalid;
return;
}
rtvalue["uid"] = uid;
rtvalue["pwd"] = user_info->pwd;
rtvalue["name"] = user_info->name;
rtvalue["email"] = user_info->email;
rtvalue["nick"] = user_info->nick;
rtvalue["desc"] = user_info->desc;
rtvalue["sex"] = user_info->sex;
rtvalue["icon"] = user_info->icon;

//从数据库获取申请列表
std::vector<std::shared_ptr<ApplyInfo>> apply_list;
auto b_apply = GetFriendApplyInfo(uid, apply_list);
if (b_apply) {
for (auto& apply : apply_list) {
Json::Value obj;
obj["name"] = apply->_name;
obj["uid"] = apply->_uid;
obj["icon"] = apply->_icon;
obj["nick"] = apply->_nick;
obj["sex"] = apply->_sex;
obj["desc"] = apply->_desc;
obj["status"] = apply->_status;
rtvalue["apply_list"].append(obj);
}
}

//获取好友列表
std::vector<std::shared_ptr<UserInfo>> friend_list;
bool b_friend_list = GetFriendList(uid, friend_list);
for (auto& friend_ele : friend_list) {
Json::Value obj;
obj["name"] = friend_ele->name;
obj["uid"] = friend_ele->uid;
obj["icon"] = friend_ele->icon;
obj["nick"] = friend_ele->nick;
obj["sex"] = friend_ele->sex;
obj["desc"] = friend_ele->desc;
obj["back"] = friend_ele->back;
rtvalue["friend_list"].append(obj);
}

auto server_name = ConfigMgr::Inst().GetValue("SelfServer", "Name");
{
//此处添加分布式锁,让该线程独占登录
//拼接用户ip对应的key
auto lock_key = LOCK_PREFIX + uid_str;
auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);
//利用defer解锁
Defer defer2([this, identifier, lock_key]() {
RedisMgr::GetInstance()->releaseLock(lock_key, identifier);
});
//此处判断该用户是否在别处或者本服务器登录

std::string uid_ip_value = "";
auto uid_ip_key = USERIPPREFIX + uid_str;
bool b_ip = RedisMgr::GetInstance()->Get(uid_ip_key, uid_ip_value);
//说明用户已经登录了,此处应该踢掉之前的用户登录状态
if (b_ip) {
//获取当前服务器ip信息
auto& cfg = ConfigMgr::Inst();
auto self_name = cfg["SelfServer"]["Name"];
//如果之前登录的服务器和当前相同,则直接在本服务器踢掉
if (uid_ip_value == self_name) {
//查找旧有的连接
auto old_session = UserMgr::GetInstance()->GetSession(uid);

//此处应该发送踢人消息
if (old_session) {
old_session->NotifyOffline(uid);
//清除旧的连接
_p_server->ClearSession(old_session->GetSessionId());
}

}
else {
//如果不是本服务器,则通知grpc通知其他服务器踢掉
//发送通知
KickUserReq kick_req;
kick_req.set_uid(uid);
ChatGrpcClient::GetInstance()->NotifyKickUser(uid_ip_value, kick_req);
}
}

//session绑定用户uid
session->SetUserId(uid);
//为用户设置登录ip server的名字
std::string ipkey = USERIPPREFIX + uid_str;
RedisMgr::GetInstance()->Set(ipkey, server_name);
//uid和session绑定管理,方便以后踢人操作
UserMgr::GetInstance()->SetUserSession(uid, session);
std::string uid_session_key = USER_SESSION_PREFIX + uid_str;
RedisMgr::GetInstance()->Set(uid_session_key, session->GetSessionId());

}

RedisMgr::GetInstance()->IncreaseCount(server_name);
return;
}

注意上面代码,这段代码就是跨服踢人逻辑。

1
2
3
4
5
6
7
else {
//如果不是本服务器,则通知grpc通知其他服务器踢掉
//发送通知
KickUserReq kick_req;
kick_req.set_uid(uid);
ChatGrpcClient::GetInstance()->NotifyKickUser(uid_ip_value, kick_req);
}

关于KickUserReq其实是我们在message.pb.h中生成的。但是我们在自己的文件中使用要用作用域messag::, 所以我们在GrpcClient.h中添加声明

1
2
using message::KickUserReq;
using message::KickUserRsp;

以后我们包含GrpcClient.h就可以使用这些类了。

封装rpc踢人

接下来我们封装rpc接口实现踢人逻辑

rpc客户端接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
KickUserRsp ChatGrpcClient::NotifyKickUser(std::string server_ip, const KickUserReq& req)
{
KickUserRsp rsp;
Defer defer([&rsp, &req]() {
rsp.set_error(ErrorCodes::Success);
rsp.set_uid(req.uid());
});

auto find_iter = _pools.find(server_ip);
if (find_iter == _pools.end()) {
return rsp;
}

auto& pool = find_iter->second;
ClientContext context;
auto stub = pool->getConnection();
Defer defercon([&stub, this, &pool]() {
pool->returnConnection(std::move(stub));
});
Status status = stub->NotifyKickUser(&context, req, &rsp);

if (!status.ok()) {
rsp.set_error(ErrorCodes::RPCFailed);
return rsp;
}

return rsp;
}

rpc服务端接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Status ChatServiceImpl::NotifyKickUser(::grpc::ServerContext* context, 
const KickUserReq* request, KickUserRsp* reply)
{
//查找用户是否在本服务器
auto uid = request->uid();
auto session = UserMgr::GetInstance()->GetSession(uid);

Defer defer([request, reply]() {
reply->set_error(ErrorCodes::Success);
reply->set_uid(request->uid());
});

//用户不在内存中则直接返回
if (session == nullptr) {
return Status::OK;
}

//在内存中则直接发送通知对方
session->NotifyOffline(uid);
//清除旧的连接
_p_server->ClearSession(session->GetSessionId());

return Status::OK;
}

为了让ChatServiceImpl 获取CServer, 所以我们提供了注册函数

1
2
3
4
void ChatServiceImpl::RegisterServer(std::shared_ptr<CServer> pServer)
{
_p_server = pServer;
}

这个函数在main函数中启动grpc服务前注册即可。

登录数量统计

StatusServer中利用分布式锁获取登录数量,动态分配Server给客户端,这里我们也要用ChatServer启动和退出时清空登录数量

重新调整ChatServer启动逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using namespace std;
bool bstop = false;
std::condition_variable cond_quit;
std::mutex mutex_quit;

int main()
{
auto& cfg = ConfigMgr::Inst();
auto server_name = cfg["SelfServer"]["Name"];
try {
auto pool = AsioIOServicePool::GetInstance();
//将登录数设置为0
RedisMgr::GetInstance()->InitCount(server_name);
Defer derfer ([server_name]() {
RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);
RedisMgr::GetInstance()->Close();
});

boost::asio::io_context io_context;
auto port_str = cfg["SelfServer"]["Port"];
//创建Cserver智能指针
auto pointer_server = std::make_shared<CServer>(io_context, atoi(port_str.c_str()));
//定义一个GrpcServer
std::string server_address(cfg["SelfServer"]["Host"] + ":" + cfg["SelfServer"]["RPCPort"]);
ChatServiceImpl service;
grpc::ServerBuilder builder;
// 监听端口和添加服务
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
service.RegisterServer(pointer_server);
// 构建并启动gRPC服务器
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout << "RPC Server listening on " << server_address << std::endl;

//单独启动一个线程处理grpc服务
std::thread grpc_server_thread([&server]() {
server->Wait();
});

boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&io_context, pool, &server](auto, auto) {
io_context.stop();
pool->Stop();
server->Shutdown();
});

//将Cserver注册给逻辑类方便以后清除连接
LogicSystem::GetInstance()->SetServer(pointer_server);
io_context.run();

grpc_server_thread.join();
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}

}

上面的逻辑有这样一段,要格外注意

1
2
3
4
5
6
//将登录数设置为0
RedisMgr::GetInstance()->InitCount(server_name);
Defer derfer ([server_name]() {
RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);
RedisMgr::GetInstance()->Close();
});

这段逻辑是在服务器启动后将对应服务器中连接数清零写入redis,在服务器结束后从redis中删除数量信息,最后关闭Redis连接池