优化代码

main
gzydong 2021-05-22 14:47:46 +08:00
parent 90d9b67f22
commit 08618ee3f3
9 changed files with 205 additions and 95 deletions

View File

@ -343,7 +343,7 @@ class ChatMessageConsumer extends ConsumerMessage
{ {
$server = server(); $server = server();
foreach ($fds as $fd) { foreach ($fds as $fd) {
$server->exist($fd) && $server->push($fd, $message); $server->exist(intval($fd)) && $server->push(intval($fd), $message);
} }
} }

View File

@ -47,7 +47,7 @@ class ChatMessageProducer extends ProducerMessage
public function __construct(string $event, array $data, array $options = []) public function __construct(string $event, array $data, array $options = [])
{ {
$message = [ $message = [
'uuid' => $this->uuid(), 'uuid' => uniqid(),
'event' => $event, 'event' => $event,
'data' => $data, 'data' => $data,
'options' => $options 'options' => $options
@ -55,14 +55,4 @@ class ChatMessageProducer extends ProducerMessage
$this->payload = $message; $this->payload = $message;
} }
/**
* 生成唯一的消息ID
*
* @return string
*/
private function uuid()
{
return Str::random(8) . uniqid();
}
} }

View File

@ -26,15 +26,27 @@ abstract class AbstractRedis
/** /**
* 获取缓存 KEY * 获取缓存 KEY
* *
* @param string $key * @param string|array $key
* @return string * @return string
*/ */
protected function getCacheKey($key = '') protected function getCacheKey($key = '')
{ {
return implode(':', array_filter([ $params = [$this->prefix, $this->name];
trim($this->prefix, ':'), if (is_array($key)) {
trim($this->name, ':'), $params = array_merge($params, $key);
trim($key, ':') } else {
])); $params[] = $key;
}
return $this->filter($params);
}
protected function filter(array $params = [])
{
foreach ($params as $k => $param) {
$params[$k] = trim($param, ':');
}
return implode(':', array_filter($params));
} }
} }

View File

@ -4,7 +4,7 @@ namespace App\Cache\Repository;
class HashGroupRedis extends AbstractRedis class HashGroupRedis extends AbstractRedis
{ {
protected $prefix = 'rds-hash:multi'; protected $prefix = 'rds-hash';
protected $name = 'default'; protected $name = 'default';
@ -20,11 +20,11 @@ class HashGroupRedis extends AbstractRedis
} }
/** /**
* @param string $name * @param string $name
* @param $key * @param string|int $key
* @return false|string * @return false|string
*/ */
public function get(string $name, $key) public function get(string $name, string $key)
{ {
return $this->redis()->hGet($this->getCacheKey($name), $key); return $this->redis()->hGet($this->getCacheKey($name), $key);
} }
@ -77,4 +77,9 @@ class HashGroupRedis extends AbstractRedis
{ {
return $this->redis()->hLen($this->getCacheKey($name)); return $this->redis()->hLen($this->getCacheKey($name));
} }
public function delete(string $name)
{
return $this->redis()->del($this->getCacheKey($name));
}
} }

View File

@ -0,0 +1,51 @@
<?php
namespace App\Cache;
use App\Cache\Repository\HashGroupRedis;
/**
* :客户端ID与用户ID绑定(多对一关系)
*
* @package App\Cache
*/
class SocketFdBindUser extends HashGroupRedis
{
protected $name = 'ws:fd-user';
/**
* 添加绑定
*
* @param int $fd 客户端ID
* @param int $user_id 用户ID
* @param string $run_id 服务运行ID默认当前服务ID
* @return bool|int
*/
public function bind(int $fd, int $user_id, $run_id = SERVER_RUN_ID)
{
return $this->add($run_id, strval($fd), $user_id);
}
/**
* 解除绑定
*
* @param string $run_id 服务运行ID默认当前服务ID
* @return bool|int
*/
public function unBind(int $fd, $run_id = SERVER_RUN_ID)
{
return $this->rem($run_id, strval($fd));
}
/**
* 询客户端 FD 对应的用户ID
*
* @param int $fd 客户端ID
* @param string $run_id 服务运行ID默认当前服务ID
* @return int
*/
public function findUserId(int $fd, $run_id = SERVER_RUN_ID)
{
return (int)$this->get($run_id, strval($fd)) ?: 0;
}
}

View File

@ -0,0 +1,88 @@
<?php
namespace App\Cache;
use App\Cache\Repository\SetGroupRedis;
/**
* :用户ID与客户端ID绑定(一对多关系)
*
* @package App\Cache
*/
class SocketUserBindFds extends SetGroupRedis
{
protected $name = 'ws:user-fds';
/**
* @param int $fd 客户端ID
* @param int $user_id 用户ID
* @param string $run_id 服务运行ID默认当前服务ID
* @return bool|int
*/
public function bind(int $fd, int $user_id, $run_id = SERVER_RUN_ID)
{
return $this->add($this->filter([$run_id, $user_id]), $fd);
}
/**
* @param int $fd 客户端ID
* @param string $run_id 服务运行ID默认当前服务ID
* @return int
*/
public function unBind(int $fd, int $user_id, $run_id = SERVER_RUN_ID)
{
return $this->rem($this->filter([$run_id, $user_id]), $fd);
}
/**
* 检测用户当前是否在线(指定运行服务器)
*
* @param int $user_id 用户ID
* @param string $run_id 服务运行ID默认当前服务ID
* @return bool
*/
public function isOnline(int $user_id, $run_id = SERVER_RUN_ID): bool
{
return (bool)$this->count($this->filter([$run_id, $user_id]));
}
/**
* 检测用户当前是否在线(查询所有在线服务器)
*
* @param int $user_id 用户ID
* @param array $run_ids 服务运行ID默认当前服务ID
* @return bool
*/
public function isOnlineAll(int $user_id, array $run_ids = []): bool
{
$run_ids = $run_ids ?: ServerRunID::getInstance()->getServerRunIdAll();
foreach ($run_ids as $run_id => $time) {
if ($this->isOnline($user_id, $run_id)) return true;
}
return false;
}
/**
* 查询用户的客户端fd集合(用户可能存在多端登录)
*
* @param int $user_id 用户ID
* @param string $run_id 服务运行ID默认当前服务ID
* @return array
*/
public function findFds(int $user_id, $run_id = SERVER_RUN_ID)
{
$arr = $this->all($this->filter([$run_id, $user_id]));
foreach ($arr as $k => $value) {
$arr[$k] = intval($value);
}
return $arr;
}
public function getCachePrefix(string $run_id)
{
return $this->getCacheKey($run_id);
}
}

View File

@ -12,10 +12,11 @@ declare(strict_types=1);
namespace App\Command; namespace App\Command;
use App\Cache\ServerRunID; use App\Cache\ServerRunID;
use App\Cache\SocketFdBindUser;
use App\Cache\SocketUserBindFds;
use Hyperf\Command\Annotation\Command; use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use App\Service\SocketClientService;
/** /**
* @Command * @Command
@ -42,17 +43,32 @@ class RemoveWsCacheCommand extends HyperfCommand
public function handle() public function handle()
{ {
$socket = new SocketClientService();
$this->line('此过程可能耗时较长,请耐心等待!', 'info'); $this->line('此过程可能耗时较长,请耐心等待!', 'info');
// 获取所有已停止运行的服务ID // 获取所有已停止运行的服务ID
$arr = ServerRunID::getInstance()->getServerRunIdAll(2); $arr = ServerRunID::getInstance()->getServerRunIdAll(2);
foreach ($arr as $run_id => $value) { foreach ($arr as $run_id => $value) {
go(function () use ($socket, $run_id) { $this->clear($run_id);
$socket->removeRedisCache(strval($run_id));
});
} }
$this->line('缓存已清除!', 'info'); $this->line('缓存已清除!', 'info');
} }
public function clear(string $run_id)
{
ServerRunID::getInstance()->rem($run_id);
SocketFdBindUser::getInstance()->delete($run_id);
$prefix = SocketUserBindFds::getInstance()->getCachePrefix($run_id);
$iterator = null;
while (true) {
$keys = redis()->scan($iterator, "{$prefix}*", 20);
if ($keys === false) return;
if (!empty($keys)) {
redis()->del(...$keys);
}
}
}
} }

View File

@ -15,7 +15,9 @@ use App\Cache\Repository\SetRedis;
use App\Cache\Repository\StreamRedis; use App\Cache\Repository\StreamRedis;
use App\Cache\Repository\StringRedis; use App\Cache\Repository\StringRedis;
use App\Cache\Repository\ZSetRedis; use App\Cache\Repository\ZSetRedis;
use App\Cache\SocketFdBindUser;
use App\Cache\SocketRoom; use App\Cache\SocketRoom;
use App\Cache\SocketUserBindFds;
use App\Cache\UnreadTalk; use App\Cache\UnreadTalk;
use App\Service\TalkService; use App\Service\TalkService;
use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Command as HyperfCommand;
@ -134,10 +136,12 @@ class TestCommand extends HyperfCommand
//$socketRoom = SocketRoom::getInstance(); //$socketRoom = SocketRoom::getInstance();
//$socketRoom->addRoomMember(''); //$socketRoom->addRoomMember('');
//$keys = redis()->keys('rds-set*'); //$keys = redis()->keys('rds-set*');
//foreach ($keys as $key) { //foreach ($keys as $key) {
// redis()->del($keys); // redis()->del($keys);
//} //}
//SocketFdBindUser::getInstance()->bind(1, 2054);
//SocketUserBindFds::getInstance()->bind(1, 2054);
} }
} }

View File

@ -2,8 +2,8 @@
namespace App\Service; namespace App\Service;
use App\Cache\ServerRunID; use App\Cache\SocketFdBindUser;
use Hyperf\Redis\Redis; use App\Cache\SocketUserBindFds;
/** /**
* Socket客户端ID服务 * Socket客户端ID服务
@ -12,26 +12,6 @@ use Hyperf\Redis\Redis;
*/ */
class SocketClientService class SocketClientService
{ {
/**
* fd与用户绑定(使用hash 做处理)
*/
const BIND_FD_TO_USER = 'ws:fd:user';
/**
* 使用集合做处理
*/
const BIND_USER_TO_FDS = 'ws:user:fds';
/**
* @var Redis
*/
private $redis;
public function __construct()
{
$this->redis = container()->get(Redis::class);
}
/** /**
* 客户端fd与用户ID绑定关系 * 客户端fd与用户ID绑定关系
* *
@ -41,10 +21,8 @@ class SocketClientService
*/ */
public function bindRelation(int $fd, int $user_id, $run_id = SERVER_RUN_ID) public function bindRelation(int $fd, int $user_id, $run_id = SERVER_RUN_ID)
{ {
$this->redis->multi(); SocketFdBindUser::getInstance()->bind($fd, $user_id, $run_id);
$this->redis->hSet(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id), (string)$fd, (string)$user_id); SocketUserBindFds::getInstance()->bind($fd, $user_id, $run_id);
$this->redis->sadd(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id), $fd);
$this->redis->exec();
} }
/** /**
@ -55,10 +33,10 @@ class SocketClientService
*/ */
public function removeRelation(int $fd, $run_id = SERVER_RUN_ID) public function removeRelation(int $fd, $run_id = SERVER_RUN_ID)
{ {
$user_id = $this->findFdUserId($fd) | 0; $user_id = $this->findFdUserId($fd);
$this->redis->hdel(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id), (string)$fd); SocketFdBindUser::getInstance()->unBind($fd, $run_id);
$this->redis->srem(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id), $fd); SocketUserBindFds::getInstance()->unBind($fd, $user_id, $run_id);
} }
/** /**
@ -70,7 +48,7 @@ class SocketClientService
*/ */
public function isOnline(int $user_id, $run_id = SERVER_RUN_ID): bool public function isOnline(int $user_id, $run_id = SERVER_RUN_ID): bool
{ {
return (bool)$this->redis->scard(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); return SocketUserBindFds::getInstance()->isOnline($user_id, $run_id);
} }
/** /**
@ -82,13 +60,7 @@ class SocketClientService
*/ */
public function isOnlineAll(int $user_id, array $run_ids = []) public function isOnlineAll(int $user_id, array $run_ids = [])
{ {
$run_ids = $run_ids ?: ServerRunID::getInstance()->getServerRunIdAll(); return SocketUserBindFds::getInstance()->isOnlineAll($user_id, $run_ids);
foreach ($run_ids as $run_id => $time) {
if ($this->isOnline($user_id, $run_id)) return true;
}
return false;
} }
/** /**
@ -100,7 +72,7 @@ class SocketClientService
*/ */
public function findFdUserId(int $fd, $run_id = SERVER_RUN_ID) public function findFdUserId(int $fd, $run_id = SERVER_RUN_ID)
{ {
return $this->redis->hget(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id), (string)$fd) ?: 0; return SocketFdBindUser::getInstance()->findUserId($fd, $run_id);
} }
/** /**
@ -112,34 +84,6 @@ class SocketClientService
*/ */
public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID) public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID)
{ {
$arr = $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); return SocketUserBindFds::getInstance()->findFds($user_id, $run_id);
return $arr ? array_map(function ($fd) {
return (int)$fd;
}, $arr) : [];
}
/**
* 清除绑定缓存的信息
*
* @param string $run_id 服务运行ID
*/
public function removeRedisCache(string $run_id)
{
$this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id));
$prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id);
ServerRunID::getInstance()->rem($run_id);
$iterator = null;
while (true) {
$keys = $this->redis->scan($iterator, "{$prefix}*");
if ($keys === false) return;
if (!empty($keys)) {
$this->redis->del(...$keys);
}
}
} }
} }