diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index 2902716..97b396d 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -7,6 +7,7 @@ namespace App\Amqp\Consumer; use Hyperf\Amqp\Result; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; +use Hyperf\Redis\Redis; use PhpAmqpLib\Message\AMQPMessage; use Hyperf\Amqp\Message\Type; use Hyperf\Amqp\Builder\QueueBuilder; @@ -66,23 +67,21 @@ class ChatMessageConsumer extends ConsumerMessage */ public function consumeMessage($data, AMQPMessage $message): string { - echo PHP_EOL . $data; + $redis = container()->get(Redis::class); + + //[加锁]防止消息重复消费 + $lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']); + if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 120)) { + return Result::ACK; + } $server = server(); foreach (server()->connections as $fd) { if ($server->isEstablished($fd)) { - $server->push($fd, "Recv: 我是后台进程 [{$data}]"); + $server->push($fd, "Recv: 我是后台进程 [{$data['message']}]"); } } - return Result::NACK; - } - - /** - * @param $data - */ - public function getClientFds($data) - { - + return Result::ACK; } } diff --git a/app/Amqp/Producer/ChatMessageProducer.php b/app/Amqp/Producer/ChatMessageProducer.php index ab49942..890c018 100644 --- a/app/Amqp/Producer/ChatMessageProducer.php +++ b/app/Amqp/Producer/ChatMessageProducer.php @@ -22,10 +22,10 @@ class ChatMessageProducer extends ProducerMessage 'sender' => '', //发送者ID 'receive' => '', //接收者ID 'receiveType' => '', //接收者类型 1:好友;2:群组 - 'message' => [] + 'message' => $data ]; - $this->payload = $data; + $this->payload = $message; } /** @@ -35,6 +35,6 @@ class ChatMessageProducer extends ProducerMessage */ private function uuid() { - return Str::random() . rand(100000, 999999); + return Str::random() . rand(100000, 999999).uniqid(); } } diff --git a/app/Bootstrap/ServerStart.php b/app/Bootstrap/ServerStart.php index d2d4f73..12613d5 100644 --- a/app/Bootstrap/ServerStart.php +++ b/app/Bootstrap/ServerStart.php @@ -1,12 +1,12 @@ encode(time() . rand(1000, 9999))); - - $this->socketFDService->removeRedisCache(); - - + define('SERVER_RUN_ID', uniqid()); stdout_log()->info(sprintf('服务运行ID : %s', SERVER_RUN_ID)); - stdout_log()->info('服务启动前回调事件 : beforeStart ...'); + + $this->timer(); + Timer::tick(15000, function () { + $this->timer(); + }); } -} \ No newline at end of file + + public function timer() + { + container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time()); + } +} diff --git a/app/Command/RemoveWsCacheCommand.php b/app/Command/RemoveWsCacheCommand.php new file mode 100644 index 0000000..0f815a3 --- /dev/null +++ b/app/Command/RemoveWsCacheCommand.php @@ -0,0 +1,47 @@ +container = $container; + + parent::__construct('ws:remove-cache'); + } + + public function configure() + { + parent::configure(); + $this->setDescription('清除 WebSocket 客户端 FD 与用户绑定的缓存信息'); + } + + public function handle() + { + $socket = new SocketFDService(); + + $arr= $socket->getServerRunIdAll(2); + + foreach ($arr as $run_id=>$value){ + $socket->removeRedisCache($run_id); + } + + $this->line('缓存已清除!', 'info'); + } +} diff --git a/app/Service/SocketFDService.php b/app/Service/SocketFDService.php index 34f140a..190d16b 100644 --- a/app/Service/SocketFDService.php +++ b/app/Service/SocketFDService.php @@ -2,7 +2,6 @@ namespace App\Service; -use Hyperf\Di\Annotation\Inject; use Hyperf\Redis\Redis; /** @@ -15,19 +14,23 @@ class SocketFDService /** * fd与用户绑定(使用hash 做处理) */ - const BIND_FD_TO_USER = 'socket:fd:user'; + const BIND_FD_TO_USER = 'ws:fd:user'; /** * 使用集合做处理 */ - const BIND_USER_TO_FDS = 'socket:user:fds'; + const BIND_USER_TO_FDS = 'ws:user:fds'; /** - * @inject * @var Redis */ private $redis; + public function __construct() + { + $this->redis = container()->get(Redis::class); + } + /** * 客户端fd与用户ID绑定关系 * @@ -59,7 +62,7 @@ class SocketFDService } /** - * 检测用户当前是否在线 + * 检测用户当前是否在线(指定运行服务器) * * @param int $user_id 用户ID * @param string $run_id 服务运行ID(默认当前服务ID) @@ -70,6 +73,24 @@ class SocketFDService return $this->redis->scard(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)) ? true : false; } + /** + * 检测用户当前是否在线(查询所有在线服务器) + * + * @param int $user_id 用户ID + * @param array $run_ids 服务运行ID + * @return bool + */ + public function isOnlineAll(int $user_id, array $run_ids = []) + { + if (empty($run_ids)) $run_ids = $this->getServerRunIdAll(); + + foreach ($run_ids as $run_id) { + if ($this->isOnline($user_id, $run_id)) return true; + } + + return false; + } + /** * 查询客户端fd对应的用户ID * @@ -91,27 +112,50 @@ class SocketFDService */ public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID) { - return ''; + return $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); + } + + /** + * 获取服务ID列表 + * + * @param int $type 获取类型[1:正在运行;2:已超时;3:所有] + * @return array + */ + public function getServerRunIdAll(int $type = 1) + { + $arr = $this->redis->hGetAll('SERVER_RUN_ID'); + if ($type == 3) return $arr; + + $current_time = time(); + return array_filter($arr, function ($value) use ($current_time, $type) { + if ($type == 1) { + return ($current_time - intval($value)) <= 35; + } else { + return ($current_time - intval($value)) > 35; + } + }); } /** * 清除绑定缓存的信息 * - * @param string $run_id 服务运行ID(默认当前服务ID) + * @param string $run_id 服务运行ID */ - public function removeRedisCache($run_id = SERVER_RUN_ID) + public function removeRedisCache(string $run_id) { - $this->redis->del(self::BIND_FD_TO_USER); - $prefix = self::BIND_USER_TO_FDS; + $this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id)); + + $prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id); + $iterator = null; while (true) { $keys = $this->redis->scan($iterator, "{$prefix}*"); - if ($keys === false) { - return; - } + + if ($keys === false) return; + if (!empty($keys)) { $this->redis->del(...$keys); } } } -} \ No newline at end of file +} diff --git a/app/Service/SocketRoomService.php b/app/Service/SocketRoomService.php new file mode 100644 index 0000000..49e9f29 --- /dev/null +++ b/app/Service/SocketRoomService.php @@ -0,0 +1,9 @@ +set(self::getLockKey($key), $requestId, 'NX', 'EX', $lockSecond); + $acquired = $redis->rawCommand('SET', self::getLockKey($key), $requestId, 'NX', 'EX', $lockSecond); if ($acquired) { break; } diff --git a/app/helper.php b/app/helper.php index 328a3bd..21fb819 100644 --- a/app/helper.php +++ b/app/helper.php @@ -9,7 +9,7 @@ use Psr\Http\Message\ServerRequestInterface; use Swoole\Websocket\Frame; use Swoole\WebSocket\Server as WebSocketServer; use Hyperf\Utils\Str; - +use Hyperf\Redis\Redis; /** * 容器实例 @@ -165,4 +165,4 @@ function get_media_url(string $path) function create_image_name(string $ext, int $width, int $height) { return uniqid() . Str::random(18) . uniqid() . '_' . $width . 'x' . $height . '.' . $ext; -} \ No newline at end of file +} diff --git a/config/autoload/server.php b/config/autoload/server.php index 296b706..168ba44 100644 --- a/config/autoload/server.php +++ b/config/autoload/server.php @@ -54,7 +54,6 @@ return [ 'callbacks' => [ //自定义启动前事件 SwooleEvent::ON_BEFORE_START => [App\Bootstrap\ServerStart::class, 'beforeStart'], - SwooleEvent::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'], SwooleEvent::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'], SwooleEvent::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'], diff --git a/config/config.php b/config/config.php index 98c28d5..8982aa1 100644 --- a/config/config.php +++ b/config/config.php @@ -19,6 +19,8 @@ return [ 'ip_address'=>env('IP_ADDRESS', ''), + // 运行模式 + 'run_mode'=>'cluster', StdoutLoggerInterface::class => [ 'log_level' => [ LogLevel::ALERT,