diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index 226aaf9..7506844 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -26,9 +26,9 @@ use App\Model\Chat\ChatRecordsInvite; use App\Model\Chat\ChatRecordsForward; use App\Model\Group\Group; use App\Service\SocketClientService; -use App\Service\SocketRoomService; use App\Constants\SocketConstants; use App\Cache\Repository\LockRedis; +use App\Cache\SocketRoom; /** * 消息推送消费者队列 @@ -62,11 +62,6 @@ class ChatMessageConsumer extends ConsumerMessage */ private $socketClientService; - /** - * @var SocketRoomService - */ - private $socketRoomService; - /** * 消息事件与回调事件绑定 * @@ -93,12 +88,10 @@ class ChatMessageConsumer extends ConsumerMessage * ChatMessageConsumer constructor. * * @param SocketClientService $socketClientService - * @param SocketRoomService $socketRoomService */ - public function __construct(SocketClientService $socketClientService, SocketRoomService $socketRoomService) + public function __construct(SocketClientService $socketClientService) { $this->socketClientService = $socketClientService; - $this->socketRoomService = $socketRoomService; // 动态设置 Rabbit MQ 消费队列名称 $this->setQueue('queue:im_message:' . SERVER_RUN_ID); @@ -154,7 +147,7 @@ class ChatMessageConsumer extends ConsumerMessage if ($source == 1) {// 私聊 $fds = array_merge($fds, $this->socketClientService->findUserFds($data['data']['receive'])); } else if ($source == 2) {// 群聊 - $userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive'])); + $userIds = SocketRoom::getInstance()->getRoomMembers(strval($data['data']['receive'])); foreach ($userIds as $uid) { $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); } @@ -305,7 +298,7 @@ class ChatMessageConsumer extends ConsumerMessage $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->user_id)); $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->receive_id)); } else if ($record->source == 2) { - $userIds = $this->socketRoomService->getRoomMembers(strval($record->receive_id)); + $userIds = SocketRoom::getInstance()->getRoomMembers(strval($record->receive_id)); foreach ($userIds as $uid) { $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); } diff --git a/app/Bootstrap/ServerStart.php b/app/Bootstrap/ServerStart.php index a17eeaa..5e34267 100644 --- a/app/Bootstrap/ServerStart.php +++ b/app/Bootstrap/ServerStart.php @@ -10,9 +10,9 @@ namespace App\Bootstrap; +use App\Cache\ServerRunID; use Hyperf\Framework\Bootstrap\ServerStartCallback; use Swoole\Timer; -use Hyperf\Redis\Redis; /** * 自定义服务启动前回调事件 @@ -37,6 +37,6 @@ class ServerStart extends ServerStartCallback */ public function setRunIdTime() { - container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time()); + ServerRunID::getInstance()->add(SERVER_RUN_ID, time()); } } diff --git a/app/Cache/Repository/AbstractRedis.php b/app/Cache/Repository/AbstractRedis.php new file mode 100644 index 0000000..d1d7fa6 --- /dev/null +++ b/app/Cache/Repository/AbstractRedis.php @@ -0,0 +1,40 @@ +prefix, ':'), + trim($this->name, ':'), + trim($key, ':') + ])); + } +} diff --git a/app/Cache/Repository/HashGroupRedis.php b/app/Cache/Repository/HashGroupRedis.php new file mode 100644 index 0000000..e392566 --- /dev/null +++ b/app/Cache/Repository/HashGroupRedis.php @@ -0,0 +1,80 @@ +redis()->hSet($this->getCacheKey($name), $key, $value); + } + + /** + * @param string $name + * @param $key + * @return false|string + */ + public function get(string $name, $key) + { + return $this->redis()->hGet($this->getCacheKey($name), $key); + } + + /** + * @param string $name + * @return array + */ + public function getAll(string $name) + { + return $this->redis()->hGetAll($this->getCacheKey($name)); + } + + /** + * @param string $name + * @param string $key + * @return bool|int + */ + public function rem(string $name, string $key) + { + return $this->redis()->hDel($this->getCacheKey($name), $key); + } + + /** + * @param string $name + * @param string $key + * @param int $value + * @return int + */ + public function incr(string $name, string $key, int $value = 1) + { + return $this->redis()->hIncrBy($this->getCacheKey($name), $key, $value); + } + + /** + * @param string $name + * @param string $key + * @return bool + */ + public function isMember(string $name, string $key) + { + return $this->redis()->hExists($this->getCacheKey($name), $key); + } + + /** + * @param string $name + * @return false|int + */ + public function count(string $name) + { + return $this->redis()->hLen($this->getCacheKey($name)); + } +} diff --git a/app/Cache/Repository/HashRedis.php b/app/Cache/Repository/HashRedis.php index 7f1d9ad..683b4cf 100644 --- a/app/Cache/Repository/HashRedis.php +++ b/app/Cache/Repository/HashRedis.php @@ -9,13 +9,11 @@ use App\Cache\Contracts\HashRedisInterface; * * @package App\Cache\Repository */ -class HashRedis implements HashRedisInterface +class HashRedis extends AbstractRedis implements HashRedisInterface { - use RedisTrait; + protected $prefix = 'rds-hash'; - private $prefix = 'rds:hash'; - - public $name = 'default'; + protected $name = 'default'; /** * 获取 Hash 值 @@ -26,10 +24,10 @@ class HashRedis implements HashRedisInterface public function get(string ...$key) { if (func_num_args() == 1) { - return (string)$this->redis()->hGet($this->getKeyName(), $key[0]); + return (string)$this->redis()->hGet($this->getCacheKey(), $key[0]); } - return $this->redis()->hMGet($this->getKeyName(), $key); + return $this->redis()->hMGet($this->getCacheKey(), $key); } /** @@ -40,7 +38,7 @@ class HashRedis implements HashRedisInterface */ public function add(string $key, $value) { - $this->redis()->hSet($this->getKeyName(), $key, $value); + $this->redis()->hSet($this->getCacheKey(), $key, $value); } /** @@ -51,7 +49,7 @@ class HashRedis implements HashRedisInterface */ public function rem(string ...$key) { - return $this->redis()->hDel($this->getKeyName(), ...$key); + return $this->redis()->hDel($this->getCacheKey(), ...$key); } /** @@ -63,7 +61,7 @@ class HashRedis implements HashRedisInterface */ public function incr(string $member, int $score) { - return $this->redis()->hincrby($this->getKeyName(), $member, $score); + return $this->redis()->hincrby($this->getCacheKey(), $member, $score); } /** @@ -73,7 +71,7 @@ class HashRedis implements HashRedisInterface */ public function count() { - return (int)$this->redis()->hLen($this->getKeyName()); + return (int)$this->redis()->hLen($this->getCacheKey()); } /** @@ -83,7 +81,7 @@ class HashRedis implements HashRedisInterface */ public function all() { - return $this->redis()->hGetAll($this->getKeyName()); + return $this->redis()->hGetAll($this->getCacheKey()); } /** @@ -94,7 +92,7 @@ class HashRedis implements HashRedisInterface */ public function isMember(string $key) { - return $this->redis()->hExists($this->getKeyName(), $key); + return $this->redis()->hExists($this->getCacheKey(), $key); } /** @@ -104,6 +102,6 @@ class HashRedis implements HashRedisInterface */ public function delete() { - return (bool)$this->redis()->del($this->getKeyName()); + return (bool)$this->redis()->del($this->getCacheKey()); } } diff --git a/app/Cache/Repository/ListRedis.php b/app/Cache/Repository/ListRedis.php index 8e1fc88..fdecaa4 100644 --- a/app/Cache/Repository/ListRedis.php +++ b/app/Cache/Repository/ListRedis.php @@ -9,13 +9,11 @@ use App\Cache\Contracts\ListRedisInterface; * * @package App\Cache\Repository */ -class ListRedis implements ListRedisInterface +class ListRedis extends AbstractRedis implements ListRedisInterface { - use RedisTrait; + protected $prefix = 'rds-list'; - private $prefix = 'rds:list'; - - public $name = 'default'; + protected $name = 'default'; /** * Push 队列任务 @@ -25,7 +23,7 @@ class ListRedis implements ListRedisInterface */ public function push(string ...$value) { - return $this->redis()->lPush($this->getKeyName(), ...$value); + return $this->redis()->lPush($this->getCacheKey(), ...$value); } /** @@ -35,7 +33,7 @@ class ListRedis implements ListRedisInterface */ public function pop() { - return $this->redis()->rPop($this->getKeyName()); + return $this->redis()->rPop($this->getCacheKey()); } /** @@ -45,7 +43,7 @@ class ListRedis implements ListRedisInterface */ public function count() { - return (int)$this->redis()->lLen($this->getKeyName()); + return (int)$this->redis()->lLen($this->getCacheKey()); } /** @@ -55,7 +53,7 @@ class ListRedis implements ListRedisInterface */ public function clear() { - return $this->redis()->lTrim($this->getKeyName(), 1, 0); + return $this->redis()->lTrim($this->getCacheKey(), 1, 0); } /** @@ -65,7 +63,7 @@ class ListRedis implements ListRedisInterface */ public function all() { - return $this->redis()->lRange($this->getKeyName(), 0, -1); + return $this->redis()->lRange($this->getCacheKey(), 0, -1); } /** @@ -75,6 +73,6 @@ class ListRedis implements ListRedisInterface */ public function delete() { - return (bool)$this->redis()->del($this->getKeyName()); + return (bool)$this->redis()->del($this->getCacheKey()); } } diff --git a/app/Cache/Repository/LockRedis.php b/app/Cache/Repository/LockRedis.php index bc50c8f..00d4cbc 100644 --- a/app/Cache/Repository/LockRedis.php +++ b/app/Cache/Repository/LockRedis.php @@ -9,13 +9,11 @@ use App\Cache\Contracts\LockRedisInterface; * * @package App\Cache\Repository */ -class LockRedis implements LockRedisInterface +class LockRedis extends AbstractRedis implements LockRedisInterface { - use RedisTrait; + protected $prefix = 'rds-lock'; - private $prefix = 'rds:lock'; - - private $lockValue = 1; + protected $lockValue = 1; /** * 获取是毫秒时间戳 diff --git a/app/Cache/Repository/RedisTrait.php b/app/Cache/Repository/RedisTrait.php deleted file mode 100644 index 5771fb2..0000000 --- a/app/Cache/Repository/RedisTrait.php +++ /dev/null @@ -1,63 +0,0 @@ -prefix, ':'), trim($key, ':')); - } - - /** - * 获取缓存 KEY - * - * @return string - */ - protected function getKeyName() - { - return $this->getCacheKey($this->name); - } - - /** - * 加载数据到缓存 - */ - public function reload() - { - - } -} diff --git a/app/Cache/Repository/SetGroupRedis.php b/app/Cache/Repository/SetGroupRedis.php new file mode 100644 index 0000000..4966fe4 --- /dev/null +++ b/app/Cache/Repository/SetGroupRedis.php @@ -0,0 +1,90 @@ +redis()->sAdd($this->getCacheKey($name), ...$member); + } + + /** + * 删除成员 + * + * @param string $name 分组名 + * @param string|int ...$member 分组成员 + * @return int + */ + public function rem(string $name, ...$member) + { + return $this->redis()->sRem($this->getCacheKey($name), ...$member); + } + + /** + * 判断成员是否存在 + * + * @param string $name 分组名 + * @param string $key 分组成员 + * @return bool + */ + public function isMember(string $name, string $key) + { + return $this->redis()->sIsMember($this->getCacheKey($name), $key); + } + + /** + * 获取分组中所有成员 + * + * @param string $name 分组名 + * @return array + */ + public function all(string $name) + { + return $this->redis()->sMembers($this->getCacheKey($name)); + } + + /** + * 获取分组中成员数量 + * + * @param string $name 分组名 + * @return int + */ + public function count(string $name) + { + return $this->redis()->sCard($this->getCacheKey($name)); + } + + /** + * 删除分组 + * + * @param string $name 分组名 + * @return int + */ + public function delete(string $name) + { + return $this->redis()->del($this->getCacheKey($name)); + } + + /** + * 获取随机集合中的元素 + * + * @param int $count + * @return array|bool|mixed|string + */ + public function randMember(string $name, $count = 1) + { + return $this->redis()->sRandMember($this->getCacheKey($name), $count); + } +} diff --git a/app/Cache/Repository/SetRedis.php b/app/Cache/Repository/SetRedis.php index 010acd5..a3cb62d 100644 --- a/app/Cache/Repository/SetRedis.php +++ b/app/Cache/Repository/SetRedis.php @@ -1,8 +1,8 @@ redis()->sAdd($this->getKeyName(), ...$member); + return $this->redis()->sAdd($this->getCacheKey(), ...$member); } /** @@ -37,7 +35,7 @@ class SetRedis implements SetRedisInterface */ public function rem(string ...$member) { - return $this->redis()->sRem($this->getKeyName(), ...$member); + return $this->redis()->sRem($this->getCacheKey(), ...$member); } /** @@ -48,7 +46,7 @@ class SetRedis implements SetRedisInterface */ public function isMember(string $member) { - return $this->redis()->sIsMember($this->getKeyName(), $member); + return $this->redis()->sIsMember($this->getCacheKey(), $member); } /** @@ -58,7 +56,7 @@ class SetRedis implements SetRedisInterface */ public function all() { - return $this->redis()->sMembers($this->getKeyName()); + return $this->redis()->sMembers($this->getCacheKey()); } /** @@ -68,7 +66,7 @@ class SetRedis implements SetRedisInterface */ public function count() { - return $this->redis()->scard($this->getKeyName()); + return $this->redis()->scard($this->getCacheKey()); } /** @@ -79,7 +77,7 @@ class SetRedis implements SetRedisInterface */ public function randMember($count = 1) { - return $this->redis()->sRandMember($this->getKeyName(), $count); + return $this->redis()->sRandMember($this->getCacheKey(), $count); } /** @@ -89,6 +87,6 @@ class SetRedis implements SetRedisInterface */ public function delete() { - return (bool)$this->redis()->del($this->getKeyName()); + return (bool)$this->redis()->del($this->getCacheKey()); } } diff --git a/app/Cache/Repository/StreamRedis.php b/app/Cache/Repository/StreamRedis.php index 1ec98a6..d3d6514 100644 --- a/app/Cache/Repository/StreamRedis.php +++ b/app/Cache/Repository/StreamRedis.php @@ -3,15 +3,14 @@ namespace App\Cache\Repository; use Closure; +use App\Traits\StaticInstance; use App\Cache\Contracts\StreamRedisInterface; -class StreamRedis implements StreamRedisInterface +class StreamRedis extends AbstractRedis implements StreamRedisInterface { - use RedisTrait; + protected $prefix = 'rds-stream'; - private $prefix = 'rds:stream'; - - public $name = 'default'; + protected $name = 'default'; /** * 添加消息 @@ -23,7 +22,7 @@ class StreamRedis implements StreamRedisInterface */ public function add(array $messages, $maxLen = 0, $isApproximate = false) { - return $this->redis()->xAdd($this->getKeyName(), '*', $messages, $maxLen, $isApproximate); + return $this->redis()->xAdd($this->getCacheKey(), '*', $messages, $maxLen, $isApproximate); } /** @@ -34,7 +33,7 @@ class StreamRedis implements StreamRedisInterface */ public function rem(string ...$id) { - return $this->redis()->xDel($this->getKeyName(), $id); + return $this->redis()->xDel($this->getCacheKey(), $id); } /** @@ -46,7 +45,7 @@ class StreamRedis implements StreamRedisInterface */ public function ack(string $group, string $id) { - return $this->redis()->xAck($this->getKeyName(), $group, [$id]); + return $this->redis()->xAck($this->getCacheKey(), $group, [$id]); } /** @@ -56,7 +55,7 @@ class StreamRedis implements StreamRedisInterface */ public function count() { - return $this->redis()->xLen($this->getKeyName()); + return $this->redis()->xLen($this->getCacheKey()); } /** @@ -66,7 +65,7 @@ class StreamRedis implements StreamRedisInterface */ public function all() { - return $this->redis()->xRange($this->getKeyName(), '-', '+'); + return $this->redis()->xRange($this->getCacheKey(), '-', '+'); } /** @@ -90,7 +89,7 @@ class StreamRedis implements StreamRedisInterface */ public function delete() { - return $this->redis()->del($this->getKeyName()); + return $this->redis()->del($this->getCacheKey()); } /** @@ -102,17 +101,17 @@ class StreamRedis implements StreamRedisInterface */ public function trim(int $maxLen, bool $isApproximate = false) { - return $this->redis()->xTrim($this->getKeyName(), $maxLen, $isApproximate); + return $this->redis()->xTrim($this->getCacheKey(), $maxLen, $isApproximate); } public function group($operation, $group, $msgId = '', $mkStream = false) { - return $this->redis()->xGroup($operation, $this->getKeyName(), $group, $msgId, $mkStream); + return $this->redis()->xGroup($operation, $this->getCacheKey(), $group, $msgId, $mkStream); } public function pending($group, $start = null, $end = null, $count = null, $consumer = null) { - return $this->redis()->xPending($this->getKeyName(), $group, $start, $end, $count, $consumer); + return $this->redis()->xPending($this->getCacheKey(), $group, $start, $end, $count, $consumer); } /** @@ -123,7 +122,7 @@ class StreamRedis implements StreamRedisInterface */ public function info(string $operation = 'stream') { - return $this->redis()->xInfo($operation, $this->getKeyName()); + return $this->redis()->xInfo($operation, $this->getCacheKey()); } /** @@ -139,13 +138,13 @@ class StreamRedis implements StreamRedisInterface $this->group('create', $group, '0'); while (true) { - $tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getKeyName() => '>'], $count); + $tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getCacheKey() => '>'], $count); if (empty($tasks)) { sleep(1);// 获取不到任务,延时一秒 continue; } - foreach ($tasks[$this->getKeyName()] as $id => $task) { + foreach ($tasks[$this->getCacheKey()] as $id => $task) { if ($closure($id, $task)) { $this->ack($group, $id); } diff --git a/app/Cache/Repository/StringRedis.php b/app/Cache/Repository/StringRedis.php index 5b60e39..c0431e1 100644 --- a/app/Cache/Repository/StringRedis.php +++ b/app/Cache/Repository/StringRedis.php @@ -2,6 +2,7 @@ namespace App\Cache\Repository; +use App\Traits\StaticInstance; use App\Cache\Contracts\StringRedisInterface; /** @@ -9,11 +10,11 @@ use App\Cache\Contracts\StringRedisInterface; * * @package App\Cache\Repository */ -class StringRedis implements StringRedisInterface +class StringRedis extends AbstractRedis implements StringRedisInterface { - use RedisTrait; + protected $prefix = 'rds-string'; - private $prefix = 'rds:string'; + protected $name = ''; /** * 设置缓存 diff --git a/app/Cache/Repository/ZSetRedis.php b/app/Cache/Repository/ZSetRedis.php index a42d371..5937d23 100644 --- a/app/Cache/Repository/ZSetRedis.php +++ b/app/Cache/Repository/ZSetRedis.php @@ -9,13 +9,11 @@ use App\Cache\Contracts\ZSetRedisInterface; * * @package App\Cache\Repository */ -class ZSetRedis implements ZSetRedisInterface +class ZSetRedis extends AbstractRedis implements ZSetRedisInterface { - use RedisTrait; + protected $prefix = 'rds-zset'; - private $prefix = 'rds:zset'; - - public $name = 'default'; + protected $name = 'default'; /** * 添加有序集合元素 @@ -26,7 +24,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function add(string $member, float $score) { - return $this->redis()->zAdd($this->getKeyName(), $score, $member); + return $this->redis()->zAdd($this->getCacheKey(), $score, $member); } /** @@ -37,7 +35,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function rem(string ...$member) { - return $this->redis()->zRem($this->getKeyName(), ...$member); + return $this->redis()->zRem($this->getCacheKey(), ...$member); } /** @@ -49,7 +47,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function incr(string $member, float $score) { - return $this->redis()->zIncrBy($this->getKeyName(), $score, $member); + return $this->redis()->zIncrBy($this->getCacheKey(), $score, $member); } /** @@ -59,7 +57,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function count() { - return $this->redis()->zCard($this->getKeyName()); + return $this->redis()->zCard($this->getCacheKey()); } /** @@ -71,7 +69,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function all($asc = true, $is_score = true) { - return $this->redis()->{$asc ? 'zRevRange' : 'zRange'}($this->getKeyName(), 0, -1, $is_score); + return $this->redis()->{$asc ? 'zRevRange' : 'zRange'}($this->getCacheKey(), 0, -1, $is_score); } /** @@ -88,7 +86,7 @@ class ZSetRedis implements ZSetRedisInterface [$start, $end] = $asc ? ['+inf', '-inf'] : ['-inf', '+inf']; - $rows = $this->redis()->{$asc ? 'zRevRangeByScore' : 'zRangeByScore'}($this->getKeyName(), $start, $end, [ + $rows = $this->redis()->{$asc ? 'zRevRangeByScore' : 'zRangeByScore'}($this->getCacheKey(), $start, $end, [ 'withscores' => true, 'limit' => [($page - 1) * $size, $size] ]); @@ -120,7 +118,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function range(string $start, string $end, array $options = []) { - return $this->redis()->zRangeByScore($this->getKeyName(), $start, $end, $options); + return $this->redis()->zRangeByScore($this->getCacheKey(), $start, $end, $options); } /** @@ -132,7 +130,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function getMemberRank(string $member, $asc = true) { - return $this->redis()->{$asc ? 'zRevRank' : 'zRank'}($this->getKeyName(), $member) + 1; + return $this->redis()->{$asc ? 'zRevRank' : 'zRank'}($this->getCacheKey(), $member) + 1; } /** @@ -143,7 +141,7 @@ class ZSetRedis implements ZSetRedisInterface */ public function getMemberScore(string $member) { - return $this->redis()->zScore($this->getKeyName(), $member); + return $this->redis()->zScore($this->getCacheKey(), $member); } /** @@ -164,6 +162,6 @@ class ZSetRedis implements ZSetRedisInterface */ public function delete() { - return (bool)$this->redis()->del($this->getKeyName()); + return (bool)$this->redis()->del($this->getCacheKey()); } } diff --git a/app/Cache/ServerRunID.php b/app/Cache/ServerRunID.php new file mode 100644 index 0000000..ac95a2b --- /dev/null +++ b/app/Cache/ServerRunID.php @@ -0,0 +1,39 @@ +all(); + + if ($type == 3) return $arr; + + $current_time = time(); + return array_filter($arr, function ($value) use ($current_time, $type) { + if ($type == 1) { + return ($current_time - intval($value)) <= self::RUN_OVERTIME; + } else { + return ($current_time - intval($value)) > self::RUN_OVERTIME; + } + }); + } +} diff --git a/app/Cache/SocketRoom.php b/app/Cache/SocketRoom.php new file mode 100644 index 0000000..0768201 --- /dev/null +++ b/app/Cache/SocketRoom.php @@ -0,0 +1,60 @@ +getCacheKey($room); + } + + /** + * 获取房间中所有的用户ID + * + * @param string $room 房间名 + * @return array + */ + public function getRoomMembers(string $room) + { + return $this->all($room); + } + + /** + * 添加房间成员 + * + * @param string $room 房间名 + * @param string ...$member 用户ID + * @return bool|int + */ + public function addRoomMember(string $room, string ...$member) + { + return $this->add($room, ...$member); + } + + public function delRoomMember($room, string ...$member) + { + return $this->rem($room, ...$member); + } + + /** + * 删除房间 + * + * @param string|int $room 房间名 + * @return int + */ + public function delRoom($room) + { + return $this->delete($room); + } +} diff --git a/app/Cache/UnreadTalk.php b/app/Cache/UnreadTalk.php index 754c4d3..bb4d19f 100644 --- a/app/Cache/UnreadTalk.php +++ b/app/Cache/UnreadTalk.php @@ -44,7 +44,7 @@ class UnreadTalk extends HashRedis */ public function reset(int $sender, int $receive) { - $this->add($this->flag($sender, $receive), 0); + $this->rem($this->flag($sender, $receive)); } /** @@ -67,7 +67,7 @@ class UnreadTalk extends HashRedis { $iterator = null; $arr = []; - while ($elements = $this->redis()->hscan($this->getKeyName(), $iterator, '*_' . $user_id, 20)) { + while ($elements = $this->redis()->hscan($this->getCacheKey(), $iterator, '*_' . $user_id, 20)) { foreach ($elements as $key => $value) { $arr[explode('_', $key)[0]] = $value; } diff --git a/app/Command/RemoveWsCacheCommand.php b/app/Command/RemoveWsCacheCommand.php index 708f0b0..5b40dec 100644 --- a/app/Command/RemoveWsCacheCommand.php +++ b/app/Command/RemoveWsCacheCommand.php @@ -11,6 +11,7 @@ declare(strict_types=1); namespace App\Command; +use App\Cache\ServerRunID; use Hyperf\Command\Annotation\Command; use Hyperf\Command\Command as HyperfCommand; use Psr\Container\ContainerInterface; @@ -45,8 +46,7 @@ class RemoveWsCacheCommand extends HyperfCommand $this->line('此过程可能耗时较长,请耐心等待!', 'info'); // 获取所有已停止运行的服务ID - $arr = $socket->getServerRunIdAll(2); - + $arr = ServerRunID::getInstance()->getServerRunIdAll(2); foreach ($arr as $run_id => $value) { go(function () use ($socket, $run_id) { $socket->removeRedisCache(strval($run_id)); diff --git a/app/Command/TestCommand.php b/app/Command/TestCommand.php index 05661cf..50603f2 100644 --- a/app/Command/TestCommand.php +++ b/app/Command/TestCommand.php @@ -9,10 +9,13 @@ use App\Cache\LastMessage; use App\Cache\Repository\HashRedis; use App\Cache\Repository\ListRedis; use App\Cache\Repository\LockRedis; +use App\Cache\Repository\HashGroupRedis; +use App\Cache\Repository\SetGroupRedis; use App\Cache\Repository\SetRedis; use App\Cache\Repository\StreamRedis; use App\Cache\Repository\StringRedis; use App\Cache\Repository\ZSetRedis; +use App\Cache\SocketRoom; use App\Cache\UnreadTalk; use App\Service\TalkService; use Hyperf\Command\Command as HyperfCommand; @@ -53,7 +56,7 @@ class TestCommand extends HyperfCommand //var_dump($string->ttl('yuandong')); //var_dump($string->isExist('yuandong')); - $hash = HashRedis::getInstance(); + //$hash = HashRedis::getInstance(); //for ($i = 0; $i < 10; $i++) { // $hash->add('user:' . $i, (string)rand(0, 100)); //} @@ -128,5 +131,13 @@ class TestCommand extends HyperfCommand //$model->talks(2054); //var_dump(FriendRemark::getInstance()->read(2054,2055)); + + //$socketRoom = SocketRoom::getInstance(); + //$socketRoom->addRoomMember(''); + + //$keys = redis()->keys('rds-set*'); + //foreach ($keys as $key) { + // redis()->del($keys); + //} } } diff --git a/app/Controller/Api/V1/ContactsController.php b/app/Controller/Api/V1/ContactsController.php index c753203..36e88fc 100644 --- a/app/Controller/Api/V1/ContactsController.php +++ b/app/Controller/Api/V1/ContactsController.php @@ -12,6 +12,7 @@ namespace App\Controller\Api\V1; use App\Cache\FriendApply; use App\Cache\FriendRemark; +use App\Cache\ServerRunID; use App\Model\UsersFriendsApply; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; @@ -59,7 +60,7 @@ class ContactsController extends CController { $rows = $this->contactsService->getContacts($this->uid()); if ($rows) { - $runArr = $this->socketClientService->getServerRunIdAll(); + $runArr = ServerRunID::getInstance()->getServerRunIdAll(); foreach ($rows as $k => $row) { // 查询用户当前是否在线 $rows[$k]['online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr); diff --git a/app/Controller/Api/V1/GroupController.php b/app/Controller/Api/V1/GroupController.php index 531444b..548eb4c 100644 --- a/app/Controller/Api/V1/GroupController.php +++ b/app/Controller/Api/V1/GroupController.php @@ -10,6 +10,7 @@ namespace App\Controller\Api\V1; +use App\Cache\SocketRoom; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\RequestMapping; @@ -21,7 +22,6 @@ use App\Model\Group\Group; use App\Model\Group\GroupMember; use App\Model\Group\GroupNotice; use App\Amqp\Producer\ChatMessageProducer; -use App\Service\SocketRoomService; use App\Service\GroupService; use App\Constants\SocketConstants; use Psr\Http\Message\ResponseInterface; @@ -41,12 +41,6 @@ class GroupController extends CController */ private $groupService; - /** - * @Inject - * @var SocketRoomService - */ - private $socketRoomService; - /** * 创建群组 * @RequestMapping(path="create", methods="post") @@ -77,7 +71,7 @@ class GroupController extends CController // 加入聊天室 $friend_ids[] = $user_id; foreach ($friend_ids as $uid) { - $this->socketRoomService->addRoomMember($uid, $data['group_id']); + SocketRoom::getInstance()->addRoomMember(strval($data['group_id']), $uid); } // ... 消息推送队列 @@ -111,7 +105,7 @@ class GroupController extends CController return $this->response->fail('群组解散失败!'); } - $this->socketRoomService->delRoom($params['group_id']); + SocketRoom::getInstance()->delRoom($params['group_id']); // ... TODO 推送群消息(预留) @@ -142,7 +136,7 @@ class GroupController extends CController // 加入聊天室 foreach ($uids as $uid) { - $this->socketRoomService->addRoomMember($uid, $params['group_id']); + SocketRoom::getInstance()->addRoomMember($params['group_id'], $uid); } // 消息推送队列 @@ -176,7 +170,7 @@ class GroupController extends CController } // 移出聊天室 - $this->socketRoomService->delRoomMember($params['group_id'], $user_id); + SocketRoom::getInstance()->delRoomMember($params['group_id'], $user_id); // 消息推送队列 push_amqp(new ChatMessageProducer(SocketConstants::EVENT_TALK, [ @@ -246,7 +240,7 @@ class GroupController extends CController // 移出聊天室 foreach ($params['members_ids'] as $uid) { - $this->socketRoomService->delRoomMember($params['group_id'], $uid); + SocketRoom::getInstance()->delRoomMember($params['group_id'], strval($uid)); } // 消息推送队列 diff --git a/app/Controller/WebSocketController.php b/app/Controller/WebSocketController.php index 91d6e3a..576ef33 100644 --- a/app/Controller/WebSocketController.php +++ b/app/Controller/WebSocketController.php @@ -11,6 +11,7 @@ declare(strict_types=1); namespace App\Controller; +use App\Cache\SocketRoom; use Hyperf\Di\Annotation\Inject; use App\Constants\SocketConstants; use Hyperf\Contract\OnCloseInterface; @@ -23,7 +24,6 @@ use Swoole\WebSocket\Server; use Phper666\JWTAuth\JWT; use App\Service\SocketClientService; use App\Service\MessageHandleService; -use App\Service\SocketRoomService; use App\Model\Group\GroupMember; use App\Amqp\Producer\ChatMessageProducer; @@ -46,12 +46,6 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos */ private $socketClientService; - /** - * @inject - * @var SocketRoomService - */ - private $socketRoomService; - /** * @inject * @var MessageHandleService @@ -92,7 +86,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos // 加入群聊 $groupIds = GroupMember::getUserGroupIds($userInfo['user_id']); foreach ($groupIds as $group_id) { - $this->socketRoomService->addRoomMember($userInfo['user_id'], $group_id); + SocketRoom::getInstance()->addRoomMember(strval($group_id), strval($userInfo['user_id'])); } if (!$isOnline) { diff --git a/app/Service/SocketClientService.php b/app/Service/SocketClientService.php index 78661a2..9d6dab9 100644 --- a/app/Service/SocketClientService.php +++ b/app/Service/SocketClientService.php @@ -2,6 +2,7 @@ namespace App\Service; +use App\Cache\ServerRunID; use Hyperf\Redis\Redis; /** @@ -21,11 +22,6 @@ class SocketClientService */ const BIND_USER_TO_FDS = 'ws:user:fds'; - /** - * 运行检测超时时间(单位秒) - */ - const RUN_OVERTIME = 35; - /** * @var Redis */ @@ -86,7 +82,7 @@ class SocketClientService */ public function isOnlineAll(int $user_id, array $run_ids = []) { - $run_ids = $run_ids ?: $this->getServerRunIdAll(); + $run_ids = $run_ids ?: ServerRunID::getInstance()->getServerRunIdAll(); foreach ($run_ids as $run_id => $time) { if ($this->isOnline($user_id, $run_id)) return true; @@ -122,27 +118,6 @@ class SocketClientService }, $arr) : []; } - /** - * 获取服务ID列表 - * - * @param int $type 获取类型[1:正在运行;2:已超时;3:所有] - * @return array - */ - public function getServerRunIdAll(int $type = 1) - { - $arr = $this->redis->hGetAll('SERVER_RUN_ID'); - if ($type == 3) return $arr; - - $current_time = time(); - return array_filter($arr, function ($value) use ($current_time, $type) { - if ($type == 1) { - return ($current_time - intval($value)) <= self::RUN_OVERTIME; - } else { - return ($current_time - intval($value)) > self::RUN_OVERTIME; - } - }); - } - /** * 清除绑定缓存的信息 * @@ -153,7 +128,8 @@ class SocketClientService $this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id)); $prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id); - $this->redis->hDel('SERVER_RUN_ID', $run_id); + + ServerRunID::getInstance()->rem($run_id); $iterator = null; while (true) { diff --git a/app/Service/SocketRoomService.php b/app/Service/SocketRoomService.php deleted file mode 100644 index 6ed0c9b..0000000 --- a/app/Service/SocketRoomService.php +++ /dev/null @@ -1,70 +0,0 @@ -sMembers($this->getRoomName($room)); - } - - /** - * 成员加入房间 - * - * @param int $usr_id 用户ID - * @param string|array $room 房间名 - * @return bool|int - */ - public function addRoomMember(int $usr_id, $room) - { - return redis()->sAdd($this->getRoomName($room), $usr_id); - } - - /** - * 成员退出房间 - * - * @param string|array $room 房间名 - * @param string|array $members 用户ID - * @return int - */ - public function delRoomMember($room, $members) - { - return redis()->sRem($this->getRoomName($room), $members); - } - - /** - * 删除房间 - * - * @param string|int $room 房间名 - * @return int - */ - public function delRoom($room) - { - return redis()->del($this->getRoomName($room)); - } -} diff --git a/app/Service/TalkService.php b/app/Service/TalkService.php index 6b74d17..19d6d62 100644 --- a/app/Service/TalkService.php +++ b/app/Service/TalkService.php @@ -2,6 +2,7 @@ namespace App\Service; +use App\Cache\ServerRunID; use Exception; use App\Model\User; use App\Model\UsersChatList; @@ -48,7 +49,7 @@ class TalkService extends BaseService if (!$rows) return []; $socketFDService = make(SocketClientService::class); - $runIdAll = $socketFDService->getServerRunIdAll(); + $runIdAll = ServerRunID::getInstance()->getServerRunIdAll(); return array_map(function ($item) use ($user_id, $socketFDService, $runIdAll) { $data['id'] = $item['id']; diff --git a/app/Traits/StaticInstance.php b/app/Traits/StaticInstance.php new file mode 100644 index 0000000..e0aea86 --- /dev/null +++ b/app/Traits/StaticInstance.php @@ -0,0 +1,32 @@ +