diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index c4f1e57..9acf1c6 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -43,7 +43,7 @@ class ChatMessageConsumer extends ConsumerMessage */ public function __construct() { - $this->setQueue('im:message:queue:' . config('ip_address')); + $this->setQueue('queue:im-message' . SERVER_RUN_ID); } /** diff --git a/app/Amqp/Producer/ChatMessageProducer.php b/app/Amqp/Producer/ChatMessageProducer.php index 890c018..ced9d52 100644 --- a/app/Amqp/Producer/ChatMessageProducer.php +++ b/app/Amqp/Producer/ChatMessageProducer.php @@ -35,6 +35,6 @@ class ChatMessageProducer extends ProducerMessage */ private function uuid() { - return Str::random() . rand(100000, 999999).uniqid(); + return Str::random() . rand(100000, 999999) . uniqid(); } } diff --git a/app/Bootstrap/ServerStart.php b/app/Bootstrap/ServerStart.php index 12613d5..7140ba5 100644 --- a/app/Bootstrap/ServerStart.php +++ b/app/Bootstrap/ServerStart.php @@ -21,17 +21,15 @@ class ServerStart extends ServerStartCallback */ public function beforeStart() { - // 服务运行ID - define('SERVER_RUN_ID', uniqid()); stdout_log()->info(sprintf('服务运行ID : %s', SERVER_RUN_ID)); - $this->timer(); + $this->setTimeOut(); Timer::tick(15000, function () { - $this->timer(); + $this->setTimeOut(); }); } - public function timer() + public function setTimeOut() { container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time()); } diff --git a/app/Controller/WebSocketController.php b/app/Controller/WebSocketController.php index df1f6be..9999d0c 100644 --- a/app/Controller/WebSocketController.php +++ b/app/Controller/WebSocketController.php @@ -48,7 +48,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos { $token = $request->get['token'] ?? ''; $userInfo = $this->jwt->getParserData($token); - stdout_log()->info("用户连接信息 : user_id:{$userInfo['user_id']} | fd:{$request->fd} | data:" . Json::encode($userInfo)); + stdout_log()->notice("用户连接信息 : user_id:{$userInfo['user_id']} | fd:{$request->fd} | data:" . Json::encode($userInfo)); // 绑定fd与用户关系 $this->socketFDService->bindRelation($request->fd, $userInfo['user_id']); @@ -65,6 +65,9 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos */ public function onMessage($server, Frame $frame): void { + // 判断是否为心跳检测 + if ($frame->data == 'PING') return; + $ip = config('ip_address'); $producer = container()->get(Producer::class); $producer->produce(new ChatMessageProducer("我是来自[{$ip} 服务器的消息],{$frame->data}")); @@ -79,8 +82,13 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos */ public function onClose($server, int $fd, int $reactorId): void { + $user_id = $this->socketFDService->findFdUserId($fd); + + stdout_log()->notice("客户端FD:{$fd} 已关闭连接,用户ID为【{$user_id}】"); + // 解除fd关系 $this->socketFDService->removeRelation($fd); - echo PHP_EOL . "FD : 【{$fd}】 已断开..."; + + // ... 包装推送消息至队列 } } diff --git a/app/Service/SocketFDService.php b/app/Service/SocketFDService.php index 190d16b..a4e5dd0 100644 --- a/app/Service/SocketFDService.php +++ b/app/Service/SocketFDService.php @@ -21,6 +21,11 @@ class SocketFDService */ const BIND_USER_TO_FDS = 'ws:user:fds'; + /** + * 运行检测超时时间(单位秒) + */ + const RUN_OVERTIME = 35; + /** * @var Redis */ @@ -108,7 +113,7 @@ class SocketFDService * * @param int $user_id 用户ID * @param string $run_id 服务运行ID(默认当前服务ID) - * @return string + * @return array */ public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID) { @@ -129,9 +134,9 @@ class SocketFDService $current_time = time(); return array_filter($arr, function ($value) use ($current_time, $type) { if ($type == 1) { - return ($current_time - intval($value)) <= 35; + return ($current_time - intval($value)) <= self::RUN_OVERTIME; } else { - return ($current_time - intval($value)) > 35; + return ($current_time - intval($value)) > self::RUN_OVERTIME; } }); } @@ -146,6 +151,7 @@ class SocketFDService $this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id)); $prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id); + $this->redis->hDel('SERVER_RUN_ID', $run_id); $iterator = null; while (true) { diff --git a/app/Service/SocketRoomService.php b/app/Service/SocketRoomService.php index 49e9f29..dc71e6a 100644 --- a/app/Service/SocketRoomService.php +++ b/app/Service/SocketRoomService.php @@ -5,5 +5,50 @@ namespace App\Service; class SocketRoomService { + const ROOM = 'ws:room'; + + /** + * 获取房间名 + * + * @param string|integer $room 房间名 + * @return string + */ + public function getRoomName($room) + { + return sprintf('%s:%s', self::ROOM, $room); + } + /** + * 获取房间成员 + * + * @param string $room 房间名 + * @return array + */ + public function getRoomMembers(string $room) + { + return redis()->sMembers($this->getRoomName($room)); + } + + /** + * 成员加入房间 + * + * @param int $usr_id 用户ID + * @param string|array $room 房间名 + * @return bool|int + */ + public function addRoomMember(int $usr_id, $room) + { + return redis()->sAdd($this->getRoomName($room), $room); + } + + /** + * 成员退出房间 + * + * @param string|array $room 房间名 + * @param string|array $members 用户ID + */ + public function delRoomMember($room, $members) + { + return redis()->sRem($this->getRoomName($room), $members); + } } diff --git a/bin/hyperf.php b/bin/hyperf.php index 232e968..bb438e8 100644 --- a/bin/hyperf.php +++ b/bin/hyperf.php @@ -12,6 +12,9 @@ date_default_timezone_set('Asia/Shanghai'); require BASE_PATH . '/vendor/autoload.php'; +// 设置服务运行ID +define('SERVER_RUN_ID', uniqid()); + // Self-called anonymous function that creates its own scope and keep the global namespace clean. (function () { Hyperf\Di\ClassLoader::init(); diff --git a/config/autoload/server.php b/config/autoload/server.php index 168ba44..e7c254e 100644 --- a/config/autoload/server.php +++ b/config/autoload/server.php @@ -46,10 +46,14 @@ return [ 'open_tcp_nodelay' => true, 'max_coroutine' => 100000, 'open_http2_protocol' => true, - 'max_request' => 100000, + 'max_request' => 1000, 'socket_buffer_size' => 3 * 1024 * 1024, 'buffer_output_size' => 3 * 1024 * 1024, 'package_max_length'=> 10 * 1024 * 1024, + + //设置心跳检测 + 'heartbeat_idle_time' => 150, + 'heartbeat_check_interval' => 60, ], 'callbacks' => [ //自定义启动前事件