优化代码

main
gzydong 2021-05-21 22:56:42 +08:00
parent 0a9dcb11af
commit 90d9b67f22
25 changed files with 447 additions and 279 deletions

View File

@ -26,9 +26,9 @@ use App\Model\Chat\ChatRecordsInvite;
use App\Model\Chat\ChatRecordsForward; use App\Model\Chat\ChatRecordsForward;
use App\Model\Group\Group; use App\Model\Group\Group;
use App\Service\SocketClientService; use App\Service\SocketClientService;
use App\Service\SocketRoomService;
use App\Constants\SocketConstants; use App\Constants\SocketConstants;
use App\Cache\Repository\LockRedis; use App\Cache\Repository\LockRedis;
use App\Cache\SocketRoom;
/** /**
* 消息推送消费者队列 * 消息推送消费者队列
@ -62,11 +62,6 @@ class ChatMessageConsumer extends ConsumerMessage
*/ */
private $socketClientService; private $socketClientService;
/**
* @var SocketRoomService
*/
private $socketRoomService;
/** /**
* 消息事件与回调事件绑定 * 消息事件与回调事件绑定
* *
@ -93,12 +88,10 @@ class ChatMessageConsumer extends ConsumerMessage
* ChatMessageConsumer constructor. * ChatMessageConsumer constructor.
* *
* @param SocketClientService $socketClientService * @param SocketClientService $socketClientService
* @param SocketRoomService $socketRoomService
*/ */
public function __construct(SocketClientService $socketClientService, SocketRoomService $socketRoomService) public function __construct(SocketClientService $socketClientService)
{ {
$this->socketClientService = $socketClientService; $this->socketClientService = $socketClientService;
$this->socketRoomService = $socketRoomService;
// 动态设置 Rabbit MQ 消费队列名称 // 动态设置 Rabbit MQ 消费队列名称
$this->setQueue('queue:im_message:' . SERVER_RUN_ID); $this->setQueue('queue:im_message:' . SERVER_RUN_ID);
@ -154,7 +147,7 @@ class ChatMessageConsumer extends ConsumerMessage
if ($source == 1) {// 私聊 if ($source == 1) {// 私聊
$fds = array_merge($fds, $this->socketClientService->findUserFds($data['data']['receive'])); $fds = array_merge($fds, $this->socketClientService->findUserFds($data['data']['receive']));
} else if ($source == 2) {// 群聊 } else if ($source == 2) {// 群聊
$userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive'])); $userIds = SocketRoom::getInstance()->getRoomMembers(strval($data['data']['receive']));
foreach ($userIds as $uid) { foreach ($userIds as $uid) {
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid));
} }
@ -305,7 +298,7 @@ class ChatMessageConsumer extends ConsumerMessage
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->user_id)); $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->user_id));
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->receive_id)); $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->receive_id));
} else if ($record->source == 2) { } else if ($record->source == 2) {
$userIds = $this->socketRoomService->getRoomMembers(strval($record->receive_id)); $userIds = SocketRoom::getInstance()->getRoomMembers(strval($record->receive_id));
foreach ($userIds as $uid) { foreach ($userIds as $uid) {
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid));
} }

View File

@ -10,9 +10,9 @@
namespace App\Bootstrap; namespace App\Bootstrap;
use App\Cache\ServerRunID;
use Hyperf\Framework\Bootstrap\ServerStartCallback; use Hyperf\Framework\Bootstrap\ServerStartCallback;
use Swoole\Timer; use Swoole\Timer;
use Hyperf\Redis\Redis;
/** /**
* 自定义服务启动前回调事件 * 自定义服务启动前回调事件
@ -37,6 +37,6 @@ class ServerStart extends ServerStartCallback
*/ */
public function setRunIdTime() public function setRunIdTime()
{ {
container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time()); ServerRunID::getInstance()->add(SERVER_RUN_ID, time());
} }
} }

View File

@ -0,0 +1,40 @@
<?php
namespace App\Cache\Repository;
use App\Traits\StaticInstance;
use Hyperf\Redis\Redis;
abstract class AbstractRedis
{
use StaticInstance;
protected $prefix = 'rds';
protected $name = '';
/**
* 获取 Redis 连接
*
* @return Redis|mixed
*/
protected function redis()
{
return redis();
}
/**
* 获取缓存 KEY
*
* @param string $key
* @return string
*/
protected function getCacheKey($key = '')
{
return implode(':', array_filter([
trim($this->prefix, ':'),
trim($this->name, ':'),
trim($key, ':')
]));
}
}

View File

@ -0,0 +1,80 @@
<?php
namespace App\Cache\Repository;
class HashGroupRedis extends AbstractRedis
{
protected $prefix = 'rds-hash:multi';
protected $name = 'default';
/**
* @param string $name
* @param string $key
* @param $value
* @return bool|int
*/
public function add(string $name, string $key, $value)
{
return $this->redis()->hSet($this->getCacheKey($name), $key, $value);
}
/**
* @param string $name
* @param $key
* @return false|string
*/
public function get(string $name, $key)
{
return $this->redis()->hGet($this->getCacheKey($name), $key);
}
/**
* @param string $name
* @return array
*/
public function getAll(string $name)
{
return $this->redis()->hGetAll($this->getCacheKey($name));
}
/**
* @param string $name
* @param string $key
* @return bool|int
*/
public function rem(string $name, string $key)
{
return $this->redis()->hDel($this->getCacheKey($name), $key);
}
/**
* @param string $name
* @param string $key
* @param int $value
* @return int
*/
public function incr(string $name, string $key, int $value = 1)
{
return $this->redis()->hIncrBy($this->getCacheKey($name), $key, $value);
}
/**
* @param string $name
* @param string $key
* @return bool
*/
public function isMember(string $name, string $key)
{
return $this->redis()->hExists($this->getCacheKey($name), $key);
}
/**
* @param string $name
* @return false|int
*/
public function count(string $name)
{
return $this->redis()->hLen($this->getCacheKey($name));
}
}

View File

@ -9,13 +9,11 @@ use App\Cache\Contracts\HashRedisInterface;
* *
* @package App\Cache\Repository * @package App\Cache\Repository
*/ */
class HashRedis implements HashRedisInterface class HashRedis extends AbstractRedis implements HashRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-hash';
private $prefix = 'rds:hash'; protected $name = 'default';
public $name = 'default';
/** /**
* 获取 Hash * 获取 Hash
@ -26,10 +24,10 @@ class HashRedis implements HashRedisInterface
public function get(string ...$key) public function get(string ...$key)
{ {
if (func_num_args() == 1) { if (func_num_args() == 1) {
return (string)$this->redis()->hGet($this->getKeyName(), $key[0]); return (string)$this->redis()->hGet($this->getCacheKey(), $key[0]);
} }
return $this->redis()->hMGet($this->getKeyName(), $key); return $this->redis()->hMGet($this->getCacheKey(), $key);
} }
/** /**
@ -40,7 +38,7 @@ class HashRedis implements HashRedisInterface
*/ */
public function add(string $key, $value) public function add(string $key, $value)
{ {
$this->redis()->hSet($this->getKeyName(), $key, $value); $this->redis()->hSet($this->getCacheKey(), $key, $value);
} }
/** /**
@ -51,7 +49,7 @@ class HashRedis implements HashRedisInterface
*/ */
public function rem(string ...$key) public function rem(string ...$key)
{ {
return $this->redis()->hDel($this->getKeyName(), ...$key); return $this->redis()->hDel($this->getCacheKey(), ...$key);
} }
/** /**
@ -63,7 +61,7 @@ class HashRedis implements HashRedisInterface
*/ */
public function incr(string $member, int $score) public function incr(string $member, int $score)
{ {
return $this->redis()->hincrby($this->getKeyName(), $member, $score); return $this->redis()->hincrby($this->getCacheKey(), $member, $score);
} }
/** /**
@ -73,7 +71,7 @@ class HashRedis implements HashRedisInterface
*/ */
public function count() public function count()
{ {
return (int)$this->redis()->hLen($this->getKeyName()); return (int)$this->redis()->hLen($this->getCacheKey());
} }
/** /**
@ -83,7 +81,7 @@ class HashRedis implements HashRedisInterface
*/ */
public function all() public function all()
{ {
return $this->redis()->hGetAll($this->getKeyName()); return $this->redis()->hGetAll($this->getCacheKey());
} }
/** /**
@ -94,7 +92,7 @@ class HashRedis implements HashRedisInterface
*/ */
public function isMember(string $key) public function isMember(string $key)
{ {
return $this->redis()->hExists($this->getKeyName(), $key); return $this->redis()->hExists($this->getCacheKey(), $key);
} }
/** /**
@ -104,6 +102,6 @@ class HashRedis implements HashRedisInterface
*/ */
public function delete() public function delete()
{ {
return (bool)$this->redis()->del($this->getKeyName()); return (bool)$this->redis()->del($this->getCacheKey());
} }
} }

View File

@ -9,13 +9,11 @@ use App\Cache\Contracts\ListRedisInterface;
* *
* @package App\Cache\Repository * @package App\Cache\Repository
*/ */
class ListRedis implements ListRedisInterface class ListRedis extends AbstractRedis implements ListRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-list';
private $prefix = 'rds:list'; protected $name = 'default';
public $name = 'default';
/** /**
* Push 队列任务 * Push 队列任务
@ -25,7 +23,7 @@ class ListRedis implements ListRedisInterface
*/ */
public function push(string ...$value) public function push(string ...$value)
{ {
return $this->redis()->lPush($this->getKeyName(), ...$value); return $this->redis()->lPush($this->getCacheKey(), ...$value);
} }
/** /**
@ -35,7 +33,7 @@ class ListRedis implements ListRedisInterface
*/ */
public function pop() public function pop()
{ {
return $this->redis()->rPop($this->getKeyName()); return $this->redis()->rPop($this->getCacheKey());
} }
/** /**
@ -45,7 +43,7 @@ class ListRedis implements ListRedisInterface
*/ */
public function count() public function count()
{ {
return (int)$this->redis()->lLen($this->getKeyName()); return (int)$this->redis()->lLen($this->getCacheKey());
} }
/** /**
@ -55,7 +53,7 @@ class ListRedis implements ListRedisInterface
*/ */
public function clear() public function clear()
{ {
return $this->redis()->lTrim($this->getKeyName(), 1, 0); return $this->redis()->lTrim($this->getCacheKey(), 1, 0);
} }
/** /**
@ -65,7 +63,7 @@ class ListRedis implements ListRedisInterface
*/ */
public function all() public function all()
{ {
return $this->redis()->lRange($this->getKeyName(), 0, -1); return $this->redis()->lRange($this->getCacheKey(), 0, -1);
} }
/** /**
@ -75,6 +73,6 @@ class ListRedis implements ListRedisInterface
*/ */
public function delete() public function delete()
{ {
return (bool)$this->redis()->del($this->getKeyName()); return (bool)$this->redis()->del($this->getCacheKey());
} }
} }

View File

@ -9,13 +9,11 @@ use App\Cache\Contracts\LockRedisInterface;
* *
* @package App\Cache\Repository * @package App\Cache\Repository
*/ */
class LockRedis implements LockRedisInterface class LockRedis extends AbstractRedis implements LockRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-lock';
private $prefix = 'rds:lock'; protected $lockValue = 1;
private $lockValue = 1;
/** /**
* 获取是毫秒时间戳 * 获取是毫秒时间戳

View File

@ -1,63 +0,0 @@
<?php
namespace App\Cache\Repository;
use Hyperf\Redis\Redis;
trait RedisTrait
{
private static $instance;
/**
* 获取单例
*
* @return static
*/
static public function getInstance()
{
if (!(self::$instance instanceof static)) {
self::$instance = new static();
}
return self::$instance;
}
/**
* 获取 Redis 连接
*
* @return Redis|mixed
*/
protected function redis()
{
return redis();
}
/**
* 获取缓存 KEY
*
* @param string $key
* @return string
*/
private function getCacheKey(string $key)
{
return sprintf('%s:%s', trim($this->prefix, ':'), trim($key, ':'));
}
/**
* 获取缓存 KEY
*
* @return string
*/
protected function getKeyName()
{
return $this->getCacheKey($this->name);
}
/**
* 加载数据到缓存
*/
public function reload()
{
}
}

View File

@ -0,0 +1,90 @@
<?php
namespace App\Cache\Repository;
class SetGroupRedis extends AbstractRedis
{
protected $prefix = 'rds-set';
protected $name = 'default';
/**
* 添加成员
*
* @param string $name 分组名
* @param string|int ...$member 分组成员
* @return bool|int
*/
public function add(string $name, ...$member)
{
return $this->redis()->sAdd($this->getCacheKey($name), ...$member);
}
/**
* 删除成员
*
* @param string $name 分组名
* @param string|int ...$member 分组成员
* @return int
*/
public function rem(string $name, ...$member)
{
return $this->redis()->sRem($this->getCacheKey($name), ...$member);
}
/**
* 判断成员是否存在
*
* @param string $name 分组名
* @param string $key 分组成员
* @return bool
*/
public function isMember(string $name, string $key)
{
return $this->redis()->sIsMember($this->getCacheKey($name), $key);
}
/**
* 获取分组中所有成员
*
* @param string $name 分组名
* @return array
*/
public function all(string $name)
{
return $this->redis()->sMembers($this->getCacheKey($name));
}
/**
* 获取分组中成员数量
*
* @param string $name 分组名
* @return int
*/
public function count(string $name)
{
return $this->redis()->sCard($this->getCacheKey($name));
}
/**
* 删除分组
*
* @param string $name 分组名
* @return int
*/
public function delete(string $name)
{
return $this->redis()->del($this->getCacheKey($name));
}
/**
* 获取随机集合中的元素
*
* @param int $count
* @return array|bool|mixed|string
*/
public function randMember(string $name, $count = 1)
{
return $this->redis()->sRandMember($this->getCacheKey($name), $count);
}
}

View File

@ -1,8 +1,8 @@
<?php <?php
namespace App\Cache\Repository; namespace App\Cache\Repository;
use App\Traits\StaticInstance;
use App\Cache\Contracts\SetRedisInterface; use App\Cache\Contracts\SetRedisInterface;
/** /**
@ -10,13 +10,11 @@ use App\Cache\Contracts\SetRedisInterface;
* *
* @package App\Cache\Repository * @package App\Cache\Repository
*/ */
class SetRedis implements SetRedisInterface class SetRedis extends AbstractRedis implements SetRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-set';
private $prefix = 'rds:set'; protected $name = 'default';
public $name = 'default';
/** /**
* 添加集合元素 * 添加集合元素
@ -26,7 +24,7 @@ class SetRedis implements SetRedisInterface
*/ */
public function add(string ...$member) public function add(string ...$member)
{ {
return $this->redis()->sAdd($this->getKeyName(), ...$member); return $this->redis()->sAdd($this->getCacheKey(), ...$member);
} }
/** /**
@ -37,7 +35,7 @@ class SetRedis implements SetRedisInterface
*/ */
public function rem(string ...$member) public function rem(string ...$member)
{ {
return $this->redis()->sRem($this->getKeyName(), ...$member); return $this->redis()->sRem($this->getCacheKey(), ...$member);
} }
/** /**
@ -48,7 +46,7 @@ class SetRedis implements SetRedisInterface
*/ */
public function isMember(string $member) public function isMember(string $member)
{ {
return $this->redis()->sIsMember($this->getKeyName(), $member); return $this->redis()->sIsMember($this->getCacheKey(), $member);
} }
/** /**
@ -58,7 +56,7 @@ class SetRedis implements SetRedisInterface
*/ */
public function all() public function all()
{ {
return $this->redis()->sMembers($this->getKeyName()); return $this->redis()->sMembers($this->getCacheKey());
} }
/** /**
@ -68,7 +66,7 @@ class SetRedis implements SetRedisInterface
*/ */
public function count() public function count()
{ {
return $this->redis()->scard($this->getKeyName()); return $this->redis()->scard($this->getCacheKey());
} }
/** /**
@ -79,7 +77,7 @@ class SetRedis implements SetRedisInterface
*/ */
public function randMember($count = 1) public function randMember($count = 1)
{ {
return $this->redis()->sRandMember($this->getKeyName(), $count); return $this->redis()->sRandMember($this->getCacheKey(), $count);
} }
/** /**
@ -89,6 +87,6 @@ class SetRedis implements SetRedisInterface
*/ */
public function delete() public function delete()
{ {
return (bool)$this->redis()->del($this->getKeyName()); return (bool)$this->redis()->del($this->getCacheKey());
} }
} }

View File

@ -3,15 +3,14 @@
namespace App\Cache\Repository; namespace App\Cache\Repository;
use Closure; use Closure;
use App\Traits\StaticInstance;
use App\Cache\Contracts\StreamRedisInterface; use App\Cache\Contracts\StreamRedisInterface;
class StreamRedis implements StreamRedisInterface class StreamRedis extends AbstractRedis implements StreamRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-stream';
private $prefix = 'rds:stream'; protected $name = 'default';
public $name = 'default';
/** /**
* 添加消息 * 添加消息
@ -23,7 +22,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function add(array $messages, $maxLen = 0, $isApproximate = false) public function add(array $messages, $maxLen = 0, $isApproximate = false)
{ {
return $this->redis()->xAdd($this->getKeyName(), '*', $messages, $maxLen, $isApproximate); return $this->redis()->xAdd($this->getCacheKey(), '*', $messages, $maxLen, $isApproximate);
} }
/** /**
@ -34,7 +33,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function rem(string ...$id) public function rem(string ...$id)
{ {
return $this->redis()->xDel($this->getKeyName(), $id); return $this->redis()->xDel($this->getCacheKey(), $id);
} }
/** /**
@ -46,7 +45,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function ack(string $group, string $id) public function ack(string $group, string $id)
{ {
return $this->redis()->xAck($this->getKeyName(), $group, [$id]); return $this->redis()->xAck($this->getCacheKey(), $group, [$id]);
} }
/** /**
@ -56,7 +55,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function count() public function count()
{ {
return $this->redis()->xLen($this->getKeyName()); return $this->redis()->xLen($this->getCacheKey());
} }
/** /**
@ -66,7 +65,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function all() public function all()
{ {
return $this->redis()->xRange($this->getKeyName(), '-', '+'); return $this->redis()->xRange($this->getCacheKey(), '-', '+');
} }
/** /**
@ -90,7 +89,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function delete() public function delete()
{ {
return $this->redis()->del($this->getKeyName()); return $this->redis()->del($this->getCacheKey());
} }
/** /**
@ -102,17 +101,17 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function trim(int $maxLen, bool $isApproximate = false) public function trim(int $maxLen, bool $isApproximate = false)
{ {
return $this->redis()->xTrim($this->getKeyName(), $maxLen, $isApproximate); return $this->redis()->xTrim($this->getCacheKey(), $maxLen, $isApproximate);
} }
public function group($operation, $group, $msgId = '', $mkStream = false) public function group($operation, $group, $msgId = '', $mkStream = false)
{ {
return $this->redis()->xGroup($operation, $this->getKeyName(), $group, $msgId, $mkStream); return $this->redis()->xGroup($operation, $this->getCacheKey(), $group, $msgId, $mkStream);
} }
public function pending($group, $start = null, $end = null, $count = null, $consumer = null) public function pending($group, $start = null, $end = null, $count = null, $consumer = null)
{ {
return $this->redis()->xPending($this->getKeyName(), $group, $start, $end, $count, $consumer); return $this->redis()->xPending($this->getCacheKey(), $group, $start, $end, $count, $consumer);
} }
/** /**
@ -123,7 +122,7 @@ class StreamRedis implements StreamRedisInterface
*/ */
public function info(string $operation = 'stream') public function info(string $operation = 'stream')
{ {
return $this->redis()->xInfo($operation, $this->getKeyName()); return $this->redis()->xInfo($operation, $this->getCacheKey());
} }
/** /**
@ -139,13 +138,13 @@ class StreamRedis implements StreamRedisInterface
$this->group('create', $group, '0'); $this->group('create', $group, '0');
while (true) { while (true) {
$tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getKeyName() => '>'], $count); $tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getCacheKey() => '>'], $count);
if (empty($tasks)) { if (empty($tasks)) {
sleep(1);// 获取不到任务,延时一秒 sleep(1);// 获取不到任务,延时一秒
continue; continue;
} }
foreach ($tasks[$this->getKeyName()] as $id => $task) { foreach ($tasks[$this->getCacheKey()] as $id => $task) {
if ($closure($id, $task)) { if ($closure($id, $task)) {
$this->ack($group, $id); $this->ack($group, $id);
} }

View File

@ -2,6 +2,7 @@
namespace App\Cache\Repository; namespace App\Cache\Repository;
use App\Traits\StaticInstance;
use App\Cache\Contracts\StringRedisInterface; use App\Cache\Contracts\StringRedisInterface;
/** /**
@ -9,11 +10,11 @@ use App\Cache\Contracts\StringRedisInterface;
* *
* @package App\Cache\Repository * @package App\Cache\Repository
*/ */
class StringRedis implements StringRedisInterface class StringRedis extends AbstractRedis implements StringRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-string';
private $prefix = 'rds:string'; protected $name = '';
/** /**
* 设置缓存 * 设置缓存

View File

@ -9,13 +9,11 @@ use App\Cache\Contracts\ZSetRedisInterface;
* *
* @package App\Cache\Repository * @package App\Cache\Repository
*/ */
class ZSetRedis implements ZSetRedisInterface class ZSetRedis extends AbstractRedis implements ZSetRedisInterface
{ {
use RedisTrait; protected $prefix = 'rds-zset';
private $prefix = 'rds:zset'; protected $name = 'default';
public $name = 'default';
/** /**
* 添加有序集合元素 * 添加有序集合元素
@ -26,7 +24,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function add(string $member, float $score) public function add(string $member, float $score)
{ {
return $this->redis()->zAdd($this->getKeyName(), $score, $member); return $this->redis()->zAdd($this->getCacheKey(), $score, $member);
} }
/** /**
@ -37,7 +35,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function rem(string ...$member) public function rem(string ...$member)
{ {
return $this->redis()->zRem($this->getKeyName(), ...$member); return $this->redis()->zRem($this->getCacheKey(), ...$member);
} }
/** /**
@ -49,7 +47,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function incr(string $member, float $score) public function incr(string $member, float $score)
{ {
return $this->redis()->zIncrBy($this->getKeyName(), $score, $member); return $this->redis()->zIncrBy($this->getCacheKey(), $score, $member);
} }
/** /**
@ -59,7 +57,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function count() public function count()
{ {
return $this->redis()->zCard($this->getKeyName()); return $this->redis()->zCard($this->getCacheKey());
} }
/** /**
@ -71,7 +69,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function all($asc = true, $is_score = true) public function all($asc = true, $is_score = true)
{ {
return $this->redis()->{$asc ? 'zRevRange' : 'zRange'}($this->getKeyName(), 0, -1, $is_score); return $this->redis()->{$asc ? 'zRevRange' : 'zRange'}($this->getCacheKey(), 0, -1, $is_score);
} }
/** /**
@ -88,7 +86,7 @@ class ZSetRedis implements ZSetRedisInterface
[$start, $end] = $asc ? ['+inf', '-inf'] : ['-inf', '+inf']; [$start, $end] = $asc ? ['+inf', '-inf'] : ['-inf', '+inf'];
$rows = $this->redis()->{$asc ? 'zRevRangeByScore' : 'zRangeByScore'}($this->getKeyName(), $start, $end, [ $rows = $this->redis()->{$asc ? 'zRevRangeByScore' : 'zRangeByScore'}($this->getCacheKey(), $start, $end, [
'withscores' => true, 'withscores' => true,
'limit' => [($page - 1) * $size, $size] 'limit' => [($page - 1) * $size, $size]
]); ]);
@ -120,7 +118,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function range(string $start, string $end, array $options = []) public function range(string $start, string $end, array $options = [])
{ {
return $this->redis()->zRangeByScore($this->getKeyName(), $start, $end, $options); return $this->redis()->zRangeByScore($this->getCacheKey(), $start, $end, $options);
} }
/** /**
@ -132,7 +130,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function getMemberRank(string $member, $asc = true) public function getMemberRank(string $member, $asc = true)
{ {
return $this->redis()->{$asc ? 'zRevRank' : 'zRank'}($this->getKeyName(), $member) + 1; return $this->redis()->{$asc ? 'zRevRank' : 'zRank'}($this->getCacheKey(), $member) + 1;
} }
/** /**
@ -143,7 +141,7 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function getMemberScore(string $member) public function getMemberScore(string $member)
{ {
return $this->redis()->zScore($this->getKeyName(), $member); return $this->redis()->zScore($this->getCacheKey(), $member);
} }
/** /**
@ -164,6 +162,6 @@ class ZSetRedis implements ZSetRedisInterface
*/ */
public function delete() public function delete()
{ {
return (bool)$this->redis()->del($this->getKeyName()); return (bool)$this->redis()->del($this->getCacheKey());
} }
} }

39
app/Cache/ServerRunID.php Normal file
View File

@ -0,0 +1,39 @@
<?php
namespace App\Cache;
use App\Cache\Repository\HashRedis;
class ServerRunID extends HashRedis
{
protected $prefix = 'SERVER_RUN_ID';
protected $name = '';
/**
* 运行检测超时时间(单位秒)
*/
const RUN_OVERTIME = 35;
/**
* 获取服务ID列表
*
* @param int $type 获取类型[1:正在运行;2:已超时;3:所有]
* @return array
*/
public function getServerRunIdAll(int $type = 1)
{
$arr = $this->all();
if ($type == 3) return $arr;
$current_time = time();
return array_filter($arr, function ($value) use ($current_time, $type) {
if ($type == 1) {
return ($current_time - intval($value)) <= self::RUN_OVERTIME;
} else {
return ($current_time - intval($value)) > self::RUN_OVERTIME;
}
});
}
}

60
app/Cache/SocketRoom.php Normal file
View File

@ -0,0 +1,60 @@
<?php
namespace App\Cache;
use App\Cache\Repository\SetGroupRedis;
class SocketRoom extends SetGroupRedis
{
protected $name = 'ws:room';
/**
* 获取房间名
*
* @param string|integer $room 房间名
* @return string
*/
public function getRoomName($room)
{
return $this->getCacheKey($room);
}
/**
* 获取房间中所有的用户ID
*
* @param string $room 房间名
* @return array
*/
public function getRoomMembers(string $room)
{
return $this->all($room);
}
/**
* 添加房间成员
*
* @param string $room 房间名
* @param string ...$member 用户ID
* @return bool|int
*/
public function addRoomMember(string $room, string ...$member)
{
return $this->add($room, ...$member);
}
public function delRoomMember($room, string ...$member)
{
return $this->rem($room, ...$member);
}
/**
* 删除房间
*
* @param string|int $room 房间名
* @return int
*/
public function delRoom($room)
{
return $this->delete($room);
}
}

View File

@ -44,7 +44,7 @@ class UnreadTalk extends HashRedis
*/ */
public function reset(int $sender, int $receive) public function reset(int $sender, int $receive)
{ {
$this->add($this->flag($sender, $receive), 0); $this->rem($this->flag($sender, $receive));
} }
/** /**
@ -67,7 +67,7 @@ class UnreadTalk extends HashRedis
{ {
$iterator = null; $iterator = null;
$arr = []; $arr = [];
while ($elements = $this->redis()->hscan($this->getKeyName(), $iterator, '*_' . $user_id, 20)) { while ($elements = $this->redis()->hscan($this->getCacheKey(), $iterator, '*_' . $user_id, 20)) {
foreach ($elements as $key => $value) { foreach ($elements as $key => $value) {
$arr[explode('_', $key)[0]] = $value; $arr[explode('_', $key)[0]] = $value;
} }

View File

@ -11,6 +11,7 @@ declare(strict_types=1);
namespace App\Command; namespace App\Command;
use App\Cache\ServerRunID;
use Hyperf\Command\Annotation\Command; use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
@ -45,8 +46,7 @@ class RemoveWsCacheCommand extends HyperfCommand
$this->line('此过程可能耗时较长,请耐心等待!', 'info'); $this->line('此过程可能耗时较长,请耐心等待!', 'info');
// 获取所有已停止运行的服务ID // 获取所有已停止运行的服务ID
$arr = $socket->getServerRunIdAll(2); $arr = ServerRunID::getInstance()->getServerRunIdAll(2);
foreach ($arr as $run_id => $value) { foreach ($arr as $run_id => $value) {
go(function () use ($socket, $run_id) { go(function () use ($socket, $run_id) {
$socket->removeRedisCache(strval($run_id)); $socket->removeRedisCache(strval($run_id));

View File

@ -9,10 +9,13 @@ use App\Cache\LastMessage;
use App\Cache\Repository\HashRedis; use App\Cache\Repository\HashRedis;
use App\Cache\Repository\ListRedis; use App\Cache\Repository\ListRedis;
use App\Cache\Repository\LockRedis; use App\Cache\Repository\LockRedis;
use App\Cache\Repository\HashGroupRedis;
use App\Cache\Repository\SetGroupRedis;
use App\Cache\Repository\SetRedis; use App\Cache\Repository\SetRedis;
use App\Cache\Repository\StreamRedis; use App\Cache\Repository\StreamRedis;
use App\Cache\Repository\StringRedis; use App\Cache\Repository\StringRedis;
use App\Cache\Repository\ZSetRedis; use App\Cache\Repository\ZSetRedis;
use App\Cache\SocketRoom;
use App\Cache\UnreadTalk; use App\Cache\UnreadTalk;
use App\Service\TalkService; use App\Service\TalkService;
use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Command as HyperfCommand;
@ -53,7 +56,7 @@ class TestCommand extends HyperfCommand
//var_dump($string->ttl('yuandong')); //var_dump($string->ttl('yuandong'));
//var_dump($string->isExist('yuandong')); //var_dump($string->isExist('yuandong'));
$hash = HashRedis::getInstance(); //$hash = HashRedis::getInstance();
//for ($i = 0; $i < 10; $i++) { //for ($i = 0; $i < 10; $i++) {
// $hash->add('user:' . $i, (string)rand(0, 100)); // $hash->add('user:' . $i, (string)rand(0, 100));
//} //}
@ -128,5 +131,13 @@ class TestCommand extends HyperfCommand
//$model->talks(2054); //$model->talks(2054);
//var_dump(FriendRemark::getInstance()->read(2054,2055)); //var_dump(FriendRemark::getInstance()->read(2054,2055));
//$socketRoom = SocketRoom::getInstance();
//$socketRoom->addRoomMember('');
//$keys = redis()->keys('rds-set*');
//foreach ($keys as $key) {
// redis()->del($keys);
//}
} }
} }

View File

@ -12,6 +12,7 @@ namespace App\Controller\Api\V1;
use App\Cache\FriendApply; use App\Cache\FriendApply;
use App\Cache\FriendRemark; use App\Cache\FriendRemark;
use App\Cache\ServerRunID;
use App\Model\UsersFriendsApply; use App\Model\UsersFriendsApply;
use Hyperf\Di\Annotation\Inject; use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\Controller;
@ -59,7 +60,7 @@ class ContactsController extends CController
{ {
$rows = $this->contactsService->getContacts($this->uid()); $rows = $this->contactsService->getContacts($this->uid());
if ($rows) { if ($rows) {
$runArr = $this->socketClientService->getServerRunIdAll(); $runArr = ServerRunID::getInstance()->getServerRunIdAll();
foreach ($rows as $k => $row) { foreach ($rows as $k => $row) {
// 查询用户当前是否在线 // 查询用户当前是否在线
$rows[$k]['online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr); $rows[$k]['online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr);

View File

@ -10,6 +10,7 @@
namespace App\Controller\Api\V1; namespace App\Controller\Api\V1;
use App\Cache\SocketRoom;
use Hyperf\Di\Annotation\Inject; use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Annotation\RequestMapping;
@ -21,7 +22,6 @@ use App\Model\Group\Group;
use App\Model\Group\GroupMember; use App\Model\Group\GroupMember;
use App\Model\Group\GroupNotice; use App\Model\Group\GroupNotice;
use App\Amqp\Producer\ChatMessageProducer; use App\Amqp\Producer\ChatMessageProducer;
use App\Service\SocketRoomService;
use App\Service\GroupService; use App\Service\GroupService;
use App\Constants\SocketConstants; use App\Constants\SocketConstants;
use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ResponseInterface;
@ -41,12 +41,6 @@ class GroupController extends CController
*/ */
private $groupService; private $groupService;
/**
* @Inject
* @var SocketRoomService
*/
private $socketRoomService;
/** /**
* 创建群组 * 创建群组
* @RequestMapping(path="create", methods="post") * @RequestMapping(path="create", methods="post")
@ -77,7 +71,7 @@ class GroupController extends CController
// 加入聊天室 // 加入聊天室
$friend_ids[] = $user_id; $friend_ids[] = $user_id;
foreach ($friend_ids as $uid) { foreach ($friend_ids as $uid) {
$this->socketRoomService->addRoomMember($uid, $data['group_id']); SocketRoom::getInstance()->addRoomMember(strval($data['group_id']), $uid);
} }
// ... 消息推送队列 // ... 消息推送队列
@ -111,7 +105,7 @@ class GroupController extends CController
return $this->response->fail('群组解散失败!'); return $this->response->fail('群组解散失败!');
} }
$this->socketRoomService->delRoom($params['group_id']); SocketRoom::getInstance()->delRoom($params['group_id']);
// ... TODO 推送群消息(预留) // ... TODO 推送群消息(预留)
@ -142,7 +136,7 @@ class GroupController extends CController
// 加入聊天室 // 加入聊天室
foreach ($uids as $uid) { foreach ($uids as $uid) {
$this->socketRoomService->addRoomMember($uid, $params['group_id']); SocketRoom::getInstance()->addRoomMember($params['group_id'], $uid);
} }
// 消息推送队列 // 消息推送队列
@ -176,7 +170,7 @@ class GroupController extends CController
} }
// 移出聊天室 // 移出聊天室
$this->socketRoomService->delRoomMember($params['group_id'], $user_id); SocketRoom::getInstance()->delRoomMember($params['group_id'], $user_id);
// 消息推送队列 // 消息推送队列
push_amqp(new ChatMessageProducer(SocketConstants::EVENT_TALK, [ push_amqp(new ChatMessageProducer(SocketConstants::EVENT_TALK, [
@ -246,7 +240,7 @@ class GroupController extends CController
// 移出聊天室 // 移出聊天室
foreach ($params['members_ids'] as $uid) { foreach ($params['members_ids'] as $uid) {
$this->socketRoomService->delRoomMember($params['group_id'], $uid); SocketRoom::getInstance()->delRoomMember($params['group_id'], strval($uid));
} }
// 消息推送队列 // 消息推送队列

View File

@ -11,6 +11,7 @@ declare(strict_types=1);
namespace App\Controller; namespace App\Controller;
use App\Cache\SocketRoom;
use Hyperf\Di\Annotation\Inject; use Hyperf\Di\Annotation\Inject;
use App\Constants\SocketConstants; use App\Constants\SocketConstants;
use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnCloseInterface;
@ -23,7 +24,6 @@ use Swoole\WebSocket\Server;
use Phper666\JWTAuth\JWT; use Phper666\JWTAuth\JWT;
use App\Service\SocketClientService; use App\Service\SocketClientService;
use App\Service\MessageHandleService; use App\Service\MessageHandleService;
use App\Service\SocketRoomService;
use App\Model\Group\GroupMember; use App\Model\Group\GroupMember;
use App\Amqp\Producer\ChatMessageProducer; use App\Amqp\Producer\ChatMessageProducer;
@ -46,12 +46,6 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
*/ */
private $socketClientService; private $socketClientService;
/**
* @inject
* @var SocketRoomService
*/
private $socketRoomService;
/** /**
* @inject * @inject
* @var MessageHandleService * @var MessageHandleService
@ -92,7 +86,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
// 加入群聊 // 加入群聊
$groupIds = GroupMember::getUserGroupIds($userInfo['user_id']); $groupIds = GroupMember::getUserGroupIds($userInfo['user_id']);
foreach ($groupIds as $group_id) { foreach ($groupIds as $group_id) {
$this->socketRoomService->addRoomMember($userInfo['user_id'], $group_id); SocketRoom::getInstance()->addRoomMember(strval($group_id), strval($userInfo['user_id']));
} }
if (!$isOnline) { if (!$isOnline) {

View File

@ -2,6 +2,7 @@
namespace App\Service; namespace App\Service;
use App\Cache\ServerRunID;
use Hyperf\Redis\Redis; use Hyperf\Redis\Redis;
/** /**
@ -21,11 +22,6 @@ class SocketClientService
*/ */
const BIND_USER_TO_FDS = 'ws:user:fds'; const BIND_USER_TO_FDS = 'ws:user:fds';
/**
* 运行检测超时时间(单位秒)
*/
const RUN_OVERTIME = 35;
/** /**
* @var Redis * @var Redis
*/ */
@ -86,7 +82,7 @@ class SocketClientService
*/ */
public function isOnlineAll(int $user_id, array $run_ids = []) public function isOnlineAll(int $user_id, array $run_ids = [])
{ {
$run_ids = $run_ids ?: $this->getServerRunIdAll(); $run_ids = $run_ids ?: ServerRunID::getInstance()->getServerRunIdAll();
foreach ($run_ids as $run_id => $time) { foreach ($run_ids as $run_id => $time) {
if ($this->isOnline($user_id, $run_id)) return true; if ($this->isOnline($user_id, $run_id)) return true;
@ -122,27 +118,6 @@ class SocketClientService
}, $arr) : []; }, $arr) : [];
} }
/**
* 获取服务ID列表
*
* @param int $type 获取类型[1:正在运行;2:已超时;3:所有]
* @return array
*/
public function getServerRunIdAll(int $type = 1)
{
$arr = $this->redis->hGetAll('SERVER_RUN_ID');
if ($type == 3) return $arr;
$current_time = time();
return array_filter($arr, function ($value) use ($current_time, $type) {
if ($type == 1) {
return ($current_time - intval($value)) <= self::RUN_OVERTIME;
} else {
return ($current_time - intval($value)) > self::RUN_OVERTIME;
}
});
}
/** /**
* 清除绑定缓存的信息 * 清除绑定缓存的信息
* *
@ -153,7 +128,8 @@ class SocketClientService
$this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id)); $this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id));
$prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id); $prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id);
$this->redis->hDel('SERVER_RUN_ID', $run_id);
ServerRunID::getInstance()->rem($run_id);
$iterator = null; $iterator = null;
while (true) { while (true) {

View File

@ -1,70 +0,0 @@
<?php
namespace App\Service;
/**
* 聊天室房间服务
*
* @package App\Service
*/
class SocketRoomService
{
const ROOM = 'ws:room';
/**
* 获取房间名
*
* @param string|integer $room 房间名
* @return string
*/
public function getRoomName($room)
{
return sprintf('%s:%s', self::ROOM, $room);
}
/**
* 获取房间成员
*
* @param string $room 房间名
* @return array
*/
public function getRoomMembers(string $room)
{
return redis()->sMembers($this->getRoomName($room));
}
/**
* 成员加入房间
*
* @param int $usr_id 用户ID
* @param string|array $room 房间名
* @return bool|int
*/
public function addRoomMember(int $usr_id, $room)
{
return redis()->sAdd($this->getRoomName($room), $usr_id);
}
/**
* 成员退出房间
*
* @param string|array $room 房间名
* @param string|array $members 用户ID
* @return int
*/
public function delRoomMember($room, $members)
{
return redis()->sRem($this->getRoomName($room), $members);
}
/**
* 删除房间
*
* @param string|int $room 房间名
* @return int
*/
public function delRoom($room)
{
return redis()->del($this->getRoomName($room));
}
}

View File

@ -2,6 +2,7 @@
namespace App\Service; namespace App\Service;
use App\Cache\ServerRunID;
use Exception; use Exception;
use App\Model\User; use App\Model\User;
use App\Model\UsersChatList; use App\Model\UsersChatList;
@ -48,7 +49,7 @@ class TalkService extends BaseService
if (!$rows) return []; if (!$rows) return [];
$socketFDService = make(SocketClientService::class); $socketFDService = make(SocketClientService::class);
$runIdAll = $socketFDService->getServerRunIdAll(); $runIdAll = ServerRunID::getInstance()->getServerRunIdAll();
return array_map(function ($item) use ($user_id, $socketFDService, $runIdAll) { return array_map(function ($item) use ($user_id, $socketFDService, $runIdAll) {
$data['id'] = $item['id']; $data['id'] = $item['id'];

View File

@ -0,0 +1,32 @@
<?php
namespace App\Traits;
/**
* Trait StaticInstance
*
* @package App\Traits
*/
trait StaticInstance
{
private static $instance;
/**
* 获取单例
*
* @return static
*/
static public function getInstance()
{
if (!(self::$instance instanceof static)) {
self::$instance = new static();
}
return self::$instance;
}
private function __clone()
{
}
}