diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index 7506844..efca7ed 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -343,7 +343,7 @@ class ChatMessageConsumer extends ConsumerMessage { $server = server(); foreach ($fds as $fd) { - $server->exist($fd) && $server->push($fd, $message); + $server->exist(intval($fd)) && $server->push(intval($fd), $message); } } diff --git a/app/Amqp/Producer/ChatMessageProducer.php b/app/Amqp/Producer/ChatMessageProducer.php index e84dac0..1991f6a 100644 --- a/app/Amqp/Producer/ChatMessageProducer.php +++ b/app/Amqp/Producer/ChatMessageProducer.php @@ -47,7 +47,7 @@ class ChatMessageProducer extends ProducerMessage public function __construct(string $event, array $data, array $options = []) { $message = [ - 'uuid' => $this->uuid(), + 'uuid' => uniqid(), 'event' => $event, 'data' => $data, 'options' => $options @@ -55,14 +55,4 @@ class ChatMessageProducer extends ProducerMessage $this->payload = $message; } - - /** - * 生成唯一的消息ID - * - * @return string - */ - private function uuid() - { - return Str::random(8) . uniqid(); - } } diff --git a/app/Cache/Repository/AbstractRedis.php b/app/Cache/Repository/AbstractRedis.php index d1d7fa6..6bf1eab 100644 --- a/app/Cache/Repository/AbstractRedis.php +++ b/app/Cache/Repository/AbstractRedis.php @@ -26,15 +26,27 @@ abstract class AbstractRedis /** * 获取缓存 KEY * - * @param string $key + * @param string|array $key * @return string */ protected function getCacheKey($key = '') { - return implode(':', array_filter([ - trim($this->prefix, ':'), - trim($this->name, ':'), - trim($key, ':') - ])); + $params = [$this->prefix, $this->name]; + if (is_array($key)) { + $params = array_merge($params, $key); + } else { + $params[] = $key; + } + + return $this->filter($params); + } + + protected function filter(array $params = []) + { + foreach ($params as $k => $param) { + $params[$k] = trim($param, ':'); + } + + return implode(':', array_filter($params)); } } diff --git a/app/Cache/Repository/HashGroupRedis.php b/app/Cache/Repository/HashGroupRedis.php index e392566..9679532 100644 --- a/app/Cache/Repository/HashGroupRedis.php +++ b/app/Cache/Repository/HashGroupRedis.php @@ -4,7 +4,7 @@ namespace App\Cache\Repository; class HashGroupRedis extends AbstractRedis { - protected $prefix = 'rds-hash:multi'; + protected $prefix = 'rds-hash'; protected $name = 'default'; @@ -20,11 +20,11 @@ class HashGroupRedis extends AbstractRedis } /** - * @param string $name - * @param $key + * @param string $name + * @param string|int $key * @return false|string */ - public function get(string $name, $key) + public function get(string $name, string $key) { return $this->redis()->hGet($this->getCacheKey($name), $key); } @@ -77,4 +77,9 @@ class HashGroupRedis extends AbstractRedis { return $this->redis()->hLen($this->getCacheKey($name)); } + + public function delete(string $name) + { + return $this->redis()->del($this->getCacheKey($name)); + } } diff --git a/app/Cache/SocketFdBindUser.php b/app/Cache/SocketFdBindUser.php new file mode 100644 index 0000000..0490db4 --- /dev/null +++ b/app/Cache/SocketFdBindUser.php @@ -0,0 +1,51 @@ +add($run_id, strval($fd), $user_id); + } + + /** + * 解除绑定 + * + * @param string $run_id 服务运行ID(默认当前服务ID) + * @return bool|int + */ + public function unBind(int $fd, $run_id = SERVER_RUN_ID) + { + return $this->rem($run_id, strval($fd)); + } + + /** + * 查询客户端 FD 对应的用户ID + * + * @param int $fd 客户端ID + * @param string $run_id 服务运行ID(默认当前服务ID) + * @return int + */ + public function findUserId(int $fd, $run_id = SERVER_RUN_ID) + { + return (int)$this->get($run_id, strval($fd)) ?: 0; + } +} diff --git a/app/Cache/SocketUserBindFds.php b/app/Cache/SocketUserBindFds.php new file mode 100644 index 0000000..a3eb12c --- /dev/null +++ b/app/Cache/SocketUserBindFds.php @@ -0,0 +1,88 @@ +add($this->filter([$run_id, $user_id]), $fd); + } + + /** + * @param int $fd 客户端ID + * @param string $run_id 服务运行ID(默认当前服务ID) + * @return int + */ + public function unBind(int $fd, int $user_id, $run_id = SERVER_RUN_ID) + { + return $this->rem($this->filter([$run_id, $user_id]), $fd); + } + + /** + * 检测用户当前是否在线(指定运行服务器) + * + * @param int $user_id 用户ID + * @param string $run_id 服务运行ID(默认当前服务ID) + * @return bool + */ + public function isOnline(int $user_id, $run_id = SERVER_RUN_ID): bool + { + return (bool)$this->count($this->filter([$run_id, $user_id])); + } + + /** + * 检测用户当前是否在线(查询所有在线服务器) + * + * @param int $user_id 用户ID + * @param array $run_ids 服务运行ID(默认当前服务ID) + * @return bool + */ + public function isOnlineAll(int $user_id, array $run_ids = []): bool + { + $run_ids = $run_ids ?: ServerRunID::getInstance()->getServerRunIdAll(); + + foreach ($run_ids as $run_id => $time) { + if ($this->isOnline($user_id, $run_id)) return true; + } + + return false; + } + + /** + * 查询用户的客户端fd集合(用户可能存在多端登录) + * + * @param int $user_id 用户ID + * @param string $run_id 服务运行ID(默认当前服务ID) + * @return array + */ + public function findFds(int $user_id, $run_id = SERVER_RUN_ID) + { + $arr = $this->all($this->filter([$run_id, $user_id])); + foreach ($arr as $k => $value) { + $arr[$k] = intval($value); + } + + return $arr; + } + + public function getCachePrefix(string $run_id) + { + return $this->getCacheKey($run_id); + } +} diff --git a/app/Command/RemoveWsCacheCommand.php b/app/Command/RemoveWsCacheCommand.php index 5b40dec..ad7a051 100644 --- a/app/Command/RemoveWsCacheCommand.php +++ b/app/Command/RemoveWsCacheCommand.php @@ -12,10 +12,11 @@ declare(strict_types=1); namespace App\Command; use App\Cache\ServerRunID; +use App\Cache\SocketFdBindUser; +use App\Cache\SocketUserBindFds; use Hyperf\Command\Annotation\Command; use Hyperf\Command\Command as HyperfCommand; use Psr\Container\ContainerInterface; -use App\Service\SocketClientService; /** * @Command @@ -42,17 +43,32 @@ class RemoveWsCacheCommand extends HyperfCommand public function handle() { - $socket = new SocketClientService(); $this->line('此过程可能耗时较长,请耐心等待!', 'info'); // 获取所有已停止运行的服务ID $arr = ServerRunID::getInstance()->getServerRunIdAll(2); foreach ($arr as $run_id => $value) { - go(function () use ($socket, $run_id) { - $socket->removeRedisCache(strval($run_id)); - }); + $this->clear($run_id); } $this->line('缓存已清除!', 'info'); } + + public function clear(string $run_id) + { + ServerRunID::getInstance()->rem($run_id); + SocketFdBindUser::getInstance()->delete($run_id); + + $prefix = SocketUserBindFds::getInstance()->getCachePrefix($run_id); + $iterator = null; + while (true) { + $keys = redis()->scan($iterator, "{$prefix}*", 20); + + if ($keys === false) return; + + if (!empty($keys)) { + redis()->del(...$keys); + } + } + } } diff --git a/app/Command/TestCommand.php b/app/Command/TestCommand.php index 50603f2..9e20570 100644 --- a/app/Command/TestCommand.php +++ b/app/Command/TestCommand.php @@ -15,7 +15,9 @@ use App\Cache\Repository\SetRedis; use App\Cache\Repository\StreamRedis; use App\Cache\Repository\StringRedis; use App\Cache\Repository\ZSetRedis; +use App\Cache\SocketFdBindUser; use App\Cache\SocketRoom; +use App\Cache\SocketUserBindFds; use App\Cache\UnreadTalk; use App\Service\TalkService; use Hyperf\Command\Command as HyperfCommand; @@ -134,10 +136,12 @@ class TestCommand extends HyperfCommand //$socketRoom = SocketRoom::getInstance(); //$socketRoom->addRoomMember(''); - //$keys = redis()->keys('rds-set*'); //foreach ($keys as $key) { // redis()->del($keys); //} + + //SocketFdBindUser::getInstance()->bind(1, 2054); + //SocketUserBindFds::getInstance()->bind(1, 2054); } } diff --git a/app/Service/SocketClientService.php b/app/Service/SocketClientService.php index 9d6dab9..b15d099 100644 --- a/app/Service/SocketClientService.php +++ b/app/Service/SocketClientService.php @@ -2,8 +2,8 @@ namespace App\Service; -use App\Cache\ServerRunID; -use Hyperf\Redis\Redis; +use App\Cache\SocketFdBindUser; +use App\Cache\SocketUserBindFds; /** * Socket客户端ID服务 @@ -12,26 +12,6 @@ use Hyperf\Redis\Redis; */ class SocketClientService { - /** - * fd与用户绑定(使用hash 做处理) - */ - const BIND_FD_TO_USER = 'ws:fd:user'; - - /** - * 使用集合做处理 - */ - const BIND_USER_TO_FDS = 'ws:user:fds'; - - /** - * @var Redis - */ - private $redis; - - public function __construct() - { - $this->redis = container()->get(Redis::class); - } - /** * 客户端fd与用户ID绑定关系 * @@ -41,10 +21,8 @@ class SocketClientService */ public function bindRelation(int $fd, int $user_id, $run_id = SERVER_RUN_ID) { - $this->redis->multi(); - $this->redis->hSet(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id), (string)$fd, (string)$user_id); - $this->redis->sadd(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id), $fd); - $this->redis->exec(); + SocketFdBindUser::getInstance()->bind($fd, $user_id, $run_id); + SocketUserBindFds::getInstance()->bind($fd, $user_id, $run_id); } /** @@ -55,10 +33,10 @@ class SocketClientService */ public function removeRelation(int $fd, $run_id = SERVER_RUN_ID) { - $user_id = $this->findFdUserId($fd) | 0; + $user_id = $this->findFdUserId($fd); - $this->redis->hdel(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id), (string)$fd); - $this->redis->srem(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id), $fd); + SocketFdBindUser::getInstance()->unBind($fd, $run_id); + SocketUserBindFds::getInstance()->unBind($fd, $user_id, $run_id); } /** @@ -70,7 +48,7 @@ class SocketClientService */ public function isOnline(int $user_id, $run_id = SERVER_RUN_ID): bool { - return (bool)$this->redis->scard(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); + return SocketUserBindFds::getInstance()->isOnline($user_id, $run_id); } /** @@ -82,13 +60,7 @@ class SocketClientService */ public function isOnlineAll(int $user_id, array $run_ids = []) { - $run_ids = $run_ids ?: ServerRunID::getInstance()->getServerRunIdAll(); - - foreach ($run_ids as $run_id => $time) { - if ($this->isOnline($user_id, $run_id)) return true; - } - - return false; + return SocketUserBindFds::getInstance()->isOnlineAll($user_id, $run_ids); } /** @@ -100,7 +72,7 @@ class SocketClientService */ public function findFdUserId(int $fd, $run_id = SERVER_RUN_ID) { - return $this->redis->hget(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id), (string)$fd) ?: 0; + return SocketFdBindUser::getInstance()->findUserId($fd, $run_id); } /** @@ -112,34 +84,6 @@ class SocketClientService */ public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID) { - $arr = $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); - return $arr ? array_map(function ($fd) { - return (int)$fd; - }, $arr) : []; - } - - /** - * 清除绑定缓存的信息 - * - * @param string $run_id 服务运行ID - */ - public function removeRedisCache(string $run_id) - { - $this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id)); - - $prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id); - - ServerRunID::getInstance()->rem($run_id); - - $iterator = null; - while (true) { - $keys = $this->redis->scan($iterator, "{$prefix}*"); - - if ($keys === false) return; - - if (!empty($keys)) { - $this->redis->del(...$keys); - } - } + return SocketUserBindFds::getInstance()->findFds($user_id, $run_id); } }