为什么需要踢人逻辑?
同账户异地登陆,需要将原有的连接断开,先发送下线信息到旧账号的客户端,然后关闭连接.
客户端去断开连接,避免服务器主动断开造成大量time_wait问题.
为什么需要分布式锁?C/C++
新客户端登录,需要通过redis存储用户uid与分配的服务器ip;而旧客户端下线,也会抹掉uid对应的服务器ip,存在一个互斥资源的访问,故而需要分布式锁.


分布式锁设计逻辑
这里使用redis作为锁存储.
客户端通过设置redis的键来加锁,通过redis的原子操作保证只有一个客户端成功设置该键,加锁时生成uuid以标识锁持有者.只有锁持有者才能释放锁
尝试加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| // 尝试获取锁,返回锁的唯一标识符(UUID),如果获取失败则返回空字符串 std::string acquireLock(redisContext* context, const std::string& lockName, int lockTimeout, int acquireTimeout) { std::string identifier = generateUUID(); std::string lockKey = "lock:" + lockName; auto endTime = std::chrono::steady_clock::now() + std::chrono::seconds(acquireTimeout);
while (std::chrono::steady_clock::now() < endTime) { // 使用 SET 命令尝试加锁:SET lockKey identifier NX EX lockTimeout redisReply* reply = (redisReply*)redisCommand(context, "SET %s %s NX EX %d", lockKey.c_str(), identifier.c_str(), lockTimeout); if (reply != nullptr) { // 判断返回结果是否为 OK if (reply->type == REDIS_REPLY_STATUS && std::string(reply->str) == "OK") { freeReplyObject(reply); return identifier; } freeReplyObject(reply); } // 暂停 1 毫秒后重试,防止忙等待 std::this_thread::sleep_for(std::chrono::milliseconds(1)); } return ""; }
|
参数说明:
redisContext* context,指向redis连接的上下文,用来与redis服务器通信,通过该上下文发送命令,获取响应.
const std::string& lockName.想要加锁的资源名称,这里应该是uid
int acquireTimeout.获取锁的最大等待时间,避免函数无限等待
int lockTimeout.锁的有效时间,超时机制,防止程序异常导致死锁
set 命令参数说明:
- SET
Redis 的基本命令,用于设置一个 key 的值。
- %s(第一个 %s)
代表锁的 key(例如 "lock:my_resource"
)。
- %s(第二个 %s)
代表锁的持有者标识符,也就是通过 generateUUID()
生成的 UUID。
- NX
表示 “Not eXists”,意思是“只有当 key 不存在时才进行设置”。这可以保证如果其他客户端已经设置了这个 key(即已经有锁了),那么当前客户端就不会覆盖原来的锁。
- EX %d
EX
参数用于指定 key 的过期时间,%d
表示锁的有效期(lockTimeout),单位为秒。这样即使客户端因某些原因没有正常释放锁,锁也会在指定时间后自动失效。
释放锁
释放锁的操作使用 Redis Lua 脚本,确保只有持有锁的客户端才能释放锁。脚本通过判断当前锁的持有者是否与传入的标识符一致来决定是否删除锁。
LUA脚本如下:
1 2 3 4 5 6
| if redis.call('get', KEYS[1]) == ARGV[1] then //检查redis中key对应的获取到的值是否与传入的 identifier 相同,只有标识符匹配时才能删除锁。 return redis.call('del', KEYS[1]) else return 0 end
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| // 释放锁,只有锁的持有者才能释放,返回是否成功 bool releaseLock(redisContext* context, const std::string& lockName, const std::string& identifier) { std::string lockKey = "lock:" + lockName; // Lua 脚本:判断锁标识是否匹配,匹配则删除锁 const char* luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then \ return redis.call('del', KEYS[1]) \ else \ return 0 \ end"; // 调用 EVAL 命令执行 Lua 脚本,第一个参数为脚本,后面依次为 key 的数量、key 以及对应的参数 redisReply* reply = (redisReply*)redisCommand(context, "EVAL %s 1 %s %s", luaScript, lockKey.c_str(), identifier.c_str()); bool success = false; if (reply != nullptr) { // 当返回整数值为 1 时,表示成功删除了锁 if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) { success = true; } freeReplyObject(reply); } return success; }
|
踢人逻辑
对于踢人逻辑,最难的就是思考如何加锁和解锁,进行踢人,以保证将来分布式登录也会安全。
这里我们先考虑几个情形
- B新登录,此时A已登录,这种最简单,根据uid找到A的session发送踢人通知
- B新登录,此时A将下线,这种要保证B和A互斥,要么B先登陆完,A再下线,要么A先下线,B再登录。这么做的好处就是保证互斥
如果B先登录,会将uid对应的session更新为最新的。A下线时会优先查找uid对应的session,发现不是自己,则直接退出即可,同时不需要修改uid对应的session为空。
如果A先退出,A下线时会优先查找uid对应的session, 发现uid对应的session和自己的连接吻合,则会将uid对应的session设置为空,然后B登录,将uid对应的session设置为新连接,这样是安全的。
互斥实现需要分布式锁.
- B登录,A退出,此时C查找uid发送消息,三个操作都会添加分布式锁。谁先竞争到锁谁操作,能保证操作的互斥。
基本就是这三种情况。接下来我们回顾下uid和Session的对应关系
主要是对删除session函数做了调整
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void UserMgr::RmvUserSession(int uid,std::string session_id) { //参数的session_id是用redis存的
{ std::lock_guard<std::mutex> lock(_session_mtx); auto iter = _uid_to_session.find(uid); if (iter != _uid_to_session.end()) { return; } auto session_id_ = iter->second->GetSessionId();//这个是根据本链接获取的sessionid,外部传的是redis里面的 if (session_id != session_id_) {//说明其他地方登录了 return; } //如果相等说明在本服务器中,根据uid去移除会话 _uid_to_session.erase(uid); }
}
|
注意:锁的精度尤其重要,对于类似上面的接口里面的并发操作中,尽量把锁写在一个一个函数中,函数结束,锁就释放
这里也只保证了线程安全,调用该函数的外层,还需要保证进程安全.
用户管理类中我只加了线程锁,没有加分布式锁,是因为我能确定既然走到这里了,一定是我外围加了分布式锁,找到了对应的服务器,再执行上述函数,所以在这样一个原子化的层级只加了线程锁以保证不会死锁.
登录流程加锁
收到登录请求会做如下事情
判断token
和uid
是否合理
根据uid
构造分布式锁key
,然后实现分布式锁加锁操作。比如uid
为1001,则分布式锁的key为”lock_1001”
加锁后通过defer自动析构解锁
通过uid
获取用户之前登录的服务器,如果存在则说明uid对应的用户还在线,此时要做踢人,判断serverip和现在的服务器ip是否相等,如果相等则说明是
本服务器踢人,只需要通过线程锁控制好并发逻辑即可,将uid
对应的旧session
发送信息通知客户端下线,并且将旧session
从server
中移除。
如果不是本服务器,则要做跨服踢人,调用grpc
踢人即可,留作之后做。
- 登录成功后,要将
uid
和对应的ip
信息写入redis
,方便以后跨服查找。另外uid
对应的session
信息也要写入redis
, 同时将uid
和session
关联,这样可以通过uid
快速找到session
在原先登录逻辑中,验证token合理后,获取用户信息前,进行加锁,使得线程独占登录
上锁之后,那么后面在redis中的一些操作获取到的数据就不会被其他线程修改,比如这里get ip.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| auto lock_key = LOCK_PREFIX + uid_str; auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT); //利用defer解锁 Defer defer2([this, identifier, lock_key]() { RedisMgr::GetInstance()->releaseLock(lock_key, identifier); }); //此处判断该用户是否在别处或者本服务器登录
std::string uid_ip_value = ""; auto uid_ip_key = USERIPPREFIX + uid_str; bool b_ip = RedisMgr::GetInstance()->Get(uid_ip_key, uid_ip_value); //说明用户已经登录了,此处应该踢掉之前的用户登录状态 if (b_ip) { //获取当前服务器ip信息 auto& cfg = ConfigMgr::Inst(); auto self_name = cfg["SelfServer"]["Name"]; //如果之前登录的服务器和当前相同,则直接在本服务器踢掉 if (uid_ip_value == self_name) { //查找旧有的连接 auto old_session = UserMgr::GetInstance()->GetSession(uid);
//此处应该发送踢人消息 if (old_session) { old_session->NotifyOffline(uid); //清除旧的连接 _p_server->ClearSession(old_session->GetSessionId()); }
} else { //如果不是本服务器,则通知grpc通知其他服务器踢掉 } }
|
服务器检测到离线的处理
服务器也会检测到离线也会清理连接,但是要注意,连接可以不按照分布式锁加锁清理,但是连接的信息要加分布式锁后再更新。
比如是否将uid
对应的session
更新到redis
中,因为很可能用户在别的新服务器登录,新服务器给旧的客户端通知离线,旧的客户端不按理连接,导致旧的服务器检测连接断开,此时不能将uid
对应的session
清空,因为uid
对应的session
已经被新服务器更新了。

在发送和接收的时候都可能检测到对方离线而报错,而可能该用户又去新的ip登录了,登录会加上分布式锁,离线的时候也要加上分布式锁,如果加锁失败说明同一个用户正在登录,已经加上锁了,需要等他登录完.在AsyncReadBody
和AsyncReadHead
以及AsyncWrite
等错误处理的时候加上连接清理操作
我们以读取body为例
只要连接错误,那么这个连接就没什么意义了,Close()掉,加锁成功后获取redis中存储的session_id.
判断redis的会话id和本服务器上本身存的会话id是否一致,不一致说明异地登陆结束了,直接返回(没有必要去再次作redis中清除用户登录信息操作了,因为已经异地登陆完成,redis中用户登录信息都是新的,万万不可删除)
若一致,则清除redis中用户信息(用户关联的session、服务器ip)
在解锁前,还要对本服务器进行会话清理,写在defer里面了,不要忘记
如下可知在外围分布式锁里面加了一个线程锁,移除本服务器上用户和会话的关联,毕竟线程锁里面在遍历,不加锁还是比较危险的.对CServer操作加锁保证多个线程clearsession的顺序后,RmvUserSession时也对UserMgr加了锁.
1 2 3 4 5 6 7 8 9 10
| void CServer::ClearSession(std::string session_id) { lock_guard<mutex> lock(_mutex); if (_sessions.find(session_id) != _sessions.end()) { //移除用户和session的关联 auto uid = _sessions[session_id]->GetUserId(); UserMgr::GetInstance()->RmvUserSession(uid,session_id); } _sessions.erase(session_id); }
|
void UserMgr::RmvUserSession(int uid,std::string session_id)
{
{
std::lock_guard<std::mutex> lock(_session_mtx);
auto iter = _uid_to_session.find(uid);
if (iter != _uid_to_session.end()) {
return;
}
auto old_session_id = iter->second->GetSessionId();
if (session_id != old_session_id) {//说明其他地方登录了
return;
}
_uid_to_session.erase(uid);
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| void CSession::AsyncReadBody(int total_len) { auto self = shared_from_this(); asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) { try { if (ec) { std::cout << "handle read failed, error is " << ec.what() << endl; Close();
//加锁清除session auto uid_str = std::to_string(_user_uid); auto lock_key = LOCK_PREFIX + uid_str; auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT); Defer defer([identifier, lock_key,self,this]() { _server->ClearSession(_session_id); RedisMgr::GetInstance()->releaseLock(lock_key, identifier); });
if (identifier.empty()) { return;//这次尝试失败,就return交给其他线程 } std::string redis_session_id = ""; auto bsuccess = RedisMgr::GetInstance()->Get(USER_SESSION_PREFIX + uid_str, redis_session_id); if (!bsuccess) { return; }
if (redis_session_id != _session_id) { //说明有客户在其他服务器异地登录了 return; }
RedisMgr::GetInstance()->Del(USER_SESSION_PREFIX + uid_str); //清除用户登录信息 RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str); return; }
if (bytes_transfered < total_len) { std::cout << "read length not match, read [" << bytes_transfered << "] , total [" << total_len<<"]" << endl; Close(); _server->ClearSession(_session_id); return; }
memcpy(_recv_msg_node->_data , _data , bytes_transfered); _recv_msg_node->_cur_len += bytes_transfered; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; cout << "receive data is " << _recv_msg_node->_data << endl; //此处将消息投递到逻辑队列中 LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node)); //继续监听头部接受事件 AsyncReadHead(HEAD_TOTAL_LEN); } catch (std::exception& e) { std::cout << "Exception code is " << e.what() << endl; } }); }
|