From 8ce33cada3b1b2af081b43682f1ad9a75c5de38b Mon Sep 17 00:00:00 2001 From: gzydong <837215079@qq.com> Date: Sun, 29 Nov 2020 14:44:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/Amqp/Consumer/ChatMessageConsumer.php | 104 +++++++++--------- app/Amqp/Producer/ChatMessageProducer.php | 25 ++--- app/Bootstrap/ServerStart.php | 11 +- app/Command/RemoveWsCacheCommand.php | 15 +-- app/Constants/ResponseCode.php | 7 -- app/Controller/AbstractController.php | 2 +- app/Controller/Api/V1/ArticleController.php | 6 +- app/Controller/Api/V1/AuthController.php | 10 +- app/Controller/Api/V1/CController.php | 2 +- app/Controller/Api/V1/EmoticonController.php | 6 +- app/Controller/Api/V1/GroupController.php | 8 +- app/Controller/Api/V1/TalkController.php | 26 ++--- app/Controller/Api/V1/UploadController.php | 4 +- app/Controller/Api/V1/UsersController.php | 20 ++-- app/Controller/IndexController.php | 7 -- app/Controller/WebSocketController.php | 27 ++--- app/Exception/Handler/AppExceptionHandler.php | 8 +- app/Helper/PushMessageHelper.php | 11 -- app/Process/AsyncQueueConsumer.php | 33 ------ app/Service/ArticleService.php | 7 +- app/Service/MessageHandleService.php | 33 +++--- ...tFDService.php => SocketClientService.php} | 7 +- app/Service/TalkService.php | 2 +- app/Traits/WebSocketTrait.php | 28 ----- app/helper.php | 7 +- 25 files changed, 169 insertions(+), 247 deletions(-) rename app/Service/{SocketFDService.php => SocketClientService.php} (95%) delete mode 100644 app/Traits/WebSocketTrait.php diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index cdb5b28..c5bde1d 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -4,8 +4,6 @@ declare(strict_types=1); namespace App\Amqp\Consumer; -use App\Model\Chat\ChatRecordsForward; -use App\Model\UsersFriend; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Result; use Hyperf\Amqp\Message\ConsumerMessage; @@ -14,18 +12,20 @@ use Hyperf\Amqp\Message\Type; use Hyperf\Amqp\Builder\QueueBuilder; use PhpAmqpLib\Message\AMQPMessage; use App\Model\User; -use App\Helper\PushMessageHelper; +use App\Model\UsersFriend; use App\Model\Chat\ChatRecord; use App\Model\Chat\ChatRecordsCode; use App\Model\Chat\ChatRecordsFile; use App\Model\Chat\ChatRecordsInvite; -use App\Service\SocketFDService; +use App\Model\Chat\ChatRecordsForward; +use App\Service\SocketClientService; use App\Service\SocketRoomService; +use App\Helper\PushMessageHelper; /** * 消息推送消费者队列 * - * @Consumer(name="聊天消息消费者",enable=true) + * @Consumer(name="ConsumerChat",enable=true) */ class ChatMessageConsumer extends ConsumerMessage { @@ -51,9 +51,9 @@ class ChatMessageConsumer extends ConsumerMessage public $routingKey = 'consumer:im:message'; /** - * @var SocketFDService + * @var SocketClientService */ - private $socketFDService; + private $socketClientService; /** * @var SocketRoomService @@ -82,14 +82,14 @@ class ChatMessageConsumer extends ConsumerMessage /** * ChatMessageConsumer constructor. - * @param SocketFDService $socketFDService + * @param SocketClientService $socketClientService * @param SocketRoomService $socketRoomService */ - public function __construct(SocketFDService $socketFDService, SocketRoomService $socketRoomService) + public function __construct(SocketClientService $socketClientService, SocketRoomService $socketRoomService) { - $this->socketFDService = $socketFDService; + $this->socketClientService = $socketClientService; $this->socketRoomService = $socketRoomService; - $this->setQueue('queue:im-message:' . SERVER_RUN_ID); + $this->setQueue('queue:im:message:' . SERVER_RUN_ID); } /** @@ -139,19 +139,19 @@ class ChatMessageConsumer extends ConsumerMessage $source = $data['data']['source']; - $fids = $this->socketFDService->findUserFds($data['data']['sender']); + $fds = $this->socketClientService->findUserFds($data['data']['sender']); if ($source == 1) {// 私聊 - $fids = array_merge($fids, $this->socketFDService->findUserFds($data['data']['receive'])); + $fds = array_merge($fds, $this->socketClientService->findUserFds($data['data']['receive'])); } else if ($source == 2) {//群聊 $userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive'])); foreach ($userIds as $uid) { - $fids = array_merge($fids, $this->socketFDService->findUserFds(intval($uid))); + $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid))); } } // 去重 - $fids = array_unique($fids); - if (empty($fids)) { + $fds = array_unique($fds); + if (empty($fds)) { return Result::ACK; } @@ -245,12 +245,12 @@ class ChatMessageConsumer extends ConsumerMessage ]) ]; - $server = server(); - foreach ($fids as $fd) { - $fd = intval($fd); - if ($server->exist($fd)) { - $server->push($fd, json_encode(['event_talk', $msg])); - } + unset($result, $file, $code_block, $forward, $invite); + + $server = websocket(); + $notify = json_encode(['event_talk', $msg]); + foreach ($fds as $fd) { + $server->exist($fd) && $server->push($fd, $notify); } return Result::ACK; @@ -265,13 +265,11 @@ class ChatMessageConsumer extends ConsumerMessage */ public function onConsumeKeyboard(array $data, AMQPMessage $message) { - $fds = $this->socketFDService->findUserFds($data['data']['receive_user']); - $server = server(); + $fds = $this->socketClientService->findUserFds($data['data']['receive_user']); + $server = websocket(); + $notify = json_encode(['event_keyboard', $data['data']]); foreach ($fds as $fd) { - $fd = intval($fd); - if ($server->exist($fd)) { - $server->push($fd, json_encode(['event_keyboard', $data['data']])); - } + $server->exist($fd) && $server->push($fd, $notify); } return Result::ACK; @@ -291,16 +289,14 @@ class ChatMessageConsumer extends ConsumerMessage $fds = []; foreach ($friends as $friend_id) { - $fds = array_merge($fds, $this->socketFDService->findUserFds(intval($friend_id))); + $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id))); } $fds = array_unique($fds); - $server = server(); + $server = websocket(); + $notify = json_encode(['event_online_status', $data['data']]); foreach ($fds as $fd) { - $fd = intval($fd); - if ($server->exist($fd)) { - $server->push($fd, json_encode(['event_online_status', $data['data']])); - } + $server->exist($fd) && $server->push($fd, $notify); } return Result::ACK; @@ -320,27 +316,26 @@ class ChatMessageConsumer extends ConsumerMessage $fds = []; if ($record->source == 1) { - $fds = array_merge($fds, $this->socketFDService->findUserFds($record->user_id)); - $fds = array_merge($fds, $this->socketFDService->findUserFds($record->receive_id)); + $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->user_id)); + $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->receive_id)); } else if ($record->source == 2) { $userIds = $this->socketRoomService->getRoomMembers(strval($record->receive_id)); foreach ($userIds as $uid) { - $fds = array_merge($fds, $this->socketFDService->findUserFds(intval($uid))); + $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); } } $fds = array_unique($fds); - $server = server(); + $server = websocket(); + $notify = json_encode(['event_revoke_talk', [ + 'record_id' => $record->id, + 'source' => $record->source, + 'user_id' => $record->user_id, + 'receive_id' => $record->receive_id, + ]]); + foreach ($fds as $fd) { - $fd = intval($fd); - if ($server->exist($fd)) { - $server->push($fd, json_encode(['event_revoke_talk', [ - 'record_id' => $record->id, - 'source' => $record->source, - 'user_id' => $record->user_id, - 'receive_id' => $record->receive_id, - ]])); - } + $server->exist($fd) && $server->push($fd, $notify); } return Result::ACK; @@ -349,20 +344,19 @@ class ChatMessageConsumer extends ConsumerMessage /** * 好友申请消息 * - * @param array $data + * @param array $data 队列消息 * @param AMQPMessage $message + * @return string */ public function onConsumeFriendApply(array $data, AMQPMessage $message) { - $fds = $this->socketFDService->findUserFds($data['data']['receive']); - + $fds = $this->socketClientService->findUserFds($data['data']['receive']); $fds = array_unique($fds); - $server = server(); + + $server = websocket(); + $notify = json_encode(['event_friend_apply', $data['data']]); foreach ($fds as $fd) { - $fd = intval($fd); - if ($server->exist($fd)) { - $server->push($fd, json_encode(['event_friend_apply', $data['data']])); - } + $server->exist($fd) && $server->push($fd, $notify); } return Result::ACK; diff --git a/app/Amqp/Producer/ChatMessageProducer.php b/app/Amqp/Producer/ChatMessageProducer.php index 48e76fb..4e36a23 100644 --- a/app/Amqp/Producer/ChatMessageProducer.php +++ b/app/Amqp/Producer/ChatMessageProducer.php @@ -8,19 +8,18 @@ use Hyperf\Amqp\Message\ProducerMessage; use Hyperf\Amqp\Message\Type; use Hyperf\Utils\Str; +/** + * 消息生产者 + * + * @package App\Amqp\Producer + */ class ChatMessageProducer extends ProducerMessage { - public $exchange = 'im.message.fanout'; - + // 交换机类型 public $type = Type::FANOUT; - const EVENTS = [ - 'event_talk', - 'event_keyboard', - 'event_online_status', - 'event_revoke_talk', - 'event_friend_apply' - ]; + // 交换机名称 + public $exchange = 'im.message.fanout'; /** * 初始化处理... @@ -31,10 +30,6 @@ class ChatMessageProducer extends ProducerMessage */ public function __construct(string $event, array $data, array $options = []) { - if (!in_array($event, self::EVENTS)) { - new \Exception('事件名未注册...'); - } - $message = [ 'uuid' => $this->uuid(),// 自定义消息ID 'event' => $event, @@ -46,12 +41,12 @@ class ChatMessageProducer extends ProducerMessage } /** - * 生成唯一ID + * 生成唯一的消息ID * * @return string */ private function uuid() { - return Str::random() . rand(100000, 999999) . uniqid(); + return Str::random() . mt_rand(100000, 999999) . uniqid(); } } diff --git a/app/Bootstrap/ServerStart.php b/app/Bootstrap/ServerStart.php index 0d53ca9..eb0142e 100644 --- a/app/Bootstrap/ServerStart.php +++ b/app/Bootstrap/ServerStart.php @@ -2,8 +2,6 @@ namespace App\Bootstrap; -use App\Support\Packet; -use App\Support\SocketIOParser; use Hyperf\Framework\Bootstrap\ServerStartCallback; use Swoole\Timer; use Hyperf\Redis\Redis; @@ -24,13 +22,16 @@ class ServerStart extends ServerStartCallback stdout_log()->info(sprintf('服务运行ID : %s', SERVER_RUN_ID)); // 维护服务运行状态 - $this->setTimeOut(); + $this->setRunIdTime(); Timer::tick(15000, function () { - $this->setTimeOut(); + $this->setRunIdTime(); }); } - public function setTimeOut() + /** + * 更新运行ID时间 + */ + public function setRunIdTime() { container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time()); } diff --git a/app/Command/RemoveWsCacheCommand.php b/app/Command/RemoveWsCacheCommand.php index 0f815a3..4dec33c 100644 --- a/app/Command/RemoveWsCacheCommand.php +++ b/app/Command/RemoveWsCacheCommand.php @@ -4,10 +4,10 @@ declare(strict_types=1); namespace App\Command; -use App\Service\SocketFDService; use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Annotation\Command; use Psr\Container\ContainerInterface; +use App\Service\SocketClientService; /** * @Command @@ -34,12 +34,13 @@ class RemoveWsCacheCommand extends HyperfCommand public function handle() { - $socket = new SocketFDService(); - - $arr= $socket->getServerRunIdAll(2); - - foreach ($arr as $run_id=>$value){ - $socket->removeRedisCache($run_id); + $socket = new SocketClientService(); + $this->line('此过程可能耗时较长,请耐心等待!', 'info'); + $arr = $socket->getServerRunIdAll(2); + foreach ($arr as $run_id => $value) { + go(function () use ($socket, $run_id) { + $socket->removeRedisCache($run_id); + }); } $this->line('缓存已清除!', 'info'); diff --git a/app/Constants/ResponseCode.php b/app/Constants/ResponseCode.php index a25ed3d..8f1e02b 100644 --- a/app/Constants/ResponseCode.php +++ b/app/Constants/ResponseCode.php @@ -1,12 +1,5 @@ talkService->revokeRecord($this->uid(), $params['record_id']); - if($isTrue){ + if ($isTrue) { $this->producer->produce( new ChatMessageProducer('event_revoke_talk', [ 'record_id' => $params['record_id'] @@ -379,7 +379,7 @@ class TalkController extends CController return $this->response->success([ 'rows' => $result, - 'record_id' => $result ? $result[count($result) - 1]['id'] : 0, + 'record_id' => $result ? end($result)['id'] : 0, 'limit' => $limit ]); } @@ -448,7 +448,7 @@ class TalkController extends CController return $this->response->success([ 'rows' => $result, - 'record_id' => $result ? $result[count($result) - 1]['id'] : 0, + 'record_id' => $result ? end($result)['id'] : 0, 'limit' => $limit ]); } diff --git a/app/Controller/Api/V1/UploadController.php b/app/Controller/Api/V1/UploadController.php index b394a83..2113749 100644 --- a/app/Controller/Api/V1/UploadController.php +++ b/app/Controller/Api/V1/UploadController.php @@ -3,13 +3,13 @@ declare(strict_types=1); namespace App\Controller\Api\V1; -use App\Service\SplitUploadService; -use App\Service\UploadService; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Annotation\Middleware; use App\Middleware\JWTAuthMiddleware; +use App\Service\SplitUploadService; +use App\Service\UploadService; /** * 上传控制器 diff --git a/app/Controller/Api/V1/UsersController.php b/app/Controller/Api/V1/UsersController.php index ca74d79..4b2fd3c 100644 --- a/app/Controller/Api/V1/UsersController.php +++ b/app/Controller/Api/V1/UsersController.php @@ -2,24 +2,24 @@ namespace App\Controller\Api\V1; -use App\Cache\ApplyNumCache; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Annotation\Middleware; use App\Middleware\JWTAuthMiddleware; +use Hyperf\Amqp\Producer; use App\Constants\ResponseCode; use App\Helper\Hash; use App\Model\User; use App\Model\UsersChatList; use App\Model\UsersFriend; -use App\Service\SmsCodeService; use App\Support\SendEmailCode; use App\Service\FriendService; use App\Service\UserService; -use App\Service\SocketFDService; +use App\Service\SocketClientService; +use App\Service\SmsCodeService; use App\Amqp\Producer\ChatMessageProducer; -use Hyperf\Amqp\Producer; +use App\Cache\ApplyNumCache; /** * Class UsersController @@ -45,9 +45,9 @@ class UsersController extends CController /** * @inject - * @var SocketFDService + * @var SocketClientService */ - private $socketFDService; + private $socketClientService; /** * @Inject @@ -64,9 +64,9 @@ class UsersController extends CController { $rows = UsersFriend::getUserFriends($this->uid()); - $runArr = $this->socketFDService->getServerRunIdAll(); + $runArr = $this->socketClientService->getServerRunIdAll(); foreach ($rows as $k => $row) { - $rows[$k]['online'] = $this->socketFDService->isOnlineAll($row['id'], $runArr); + $rows[$k]['online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr); } return $this->response->success($rows); @@ -264,7 +264,7 @@ class UsersController extends CController ApplyNumCache::setInc((int)$params['friend_id']); //判断对方是否在线。如果在线发送消息通知 - if ($this->socketFDService->isOnlineAll((int)$params['friend_id'])) { + if ($this->socketClientService->isOnlineAll((int)$params['friend_id'])) { $this->producer->produce( new ChatMessageProducer('event_friend_apply', [ 'sender' => $user_id, @@ -299,7 +299,7 @@ class UsersController extends CController } //判断对方是否在线。如果在线发送消息通知 - if ($this->socketFDService->isOnlineAll((int)$params['friend_id'])) { + if ($this->socketClientService->isOnlineAll((int)$params['friend_id'])) { // $this->producer->produce( // new ChatMessageProducer('event_friend_apply', [ // 'sender' => $user_id, diff --git a/app/Controller/IndexController.php b/app/Controller/IndexController.php index fbb7d93..8dad321 100644 --- a/app/Controller/IndexController.php +++ b/app/Controller/IndexController.php @@ -27,11 +27,4 @@ class IndexController extends AbstractController 'message' => "Hello {$user}.", ]; } - - public function upload(ResponseInterface $response) - { - return [ - 'method' => 'upload', - ]; - } } diff --git a/app/Controller/WebSocketController.php b/app/Controller/WebSocketController.php index 34666b6..ee33d4d 100644 --- a/app/Controller/WebSocketController.php +++ b/app/Controller/WebSocketController.php @@ -7,14 +7,13 @@ use Hyperf\Di\Annotation\Inject; use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnMessageInterface; use Hyperf\Contract\OnOpenInterface; -use Phper666\JWTAuth\JWT; +use Hyperf\Amqp\Producer; use Swoole\Http\Request; use Swoole\Websocket\Frame; -use Hyperf\Amqp\Producer; use Swoole\Http\Response; use Swoole\WebSocket\Server; -use App\Traits\WebSocketTrait; -use App\Service\SocketFDService; +use Phper666\JWTAuth\JWT; +use App\Service\SocketClientService; use App\Service\MessageHandleService; use App\Service\SocketRoomService; use App\Model\Group\UsersGroupMember; @@ -26,8 +25,6 @@ use App\Amqp\Producer\ChatMessageProducer; */ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface { - use WebSocketTrait; - /** * @Inject * @var JWT @@ -42,9 +39,9 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos /** * @inject - * @var SocketFDService + * @var SocketClientService */ - private $socketFDService; + private $socketClientService; /** * @inject @@ -58,6 +55,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos */ private $messageHandleService; + // 消息事件绑定 const EVENTS = [ 'event_talk' => 'onTalk', 'event_keyboard' => 'onKeyboard', @@ -76,10 +74,10 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos stdout_log()->notice("用户连接信息 : user_id:{$userInfo['user_id']} | fd:{$request->fd} 时间:" . date('Y-m-d H:i:s')); // 判断是否存在异地登录 - $isOnline = $this->socketFDService->isOnlineAll(intval($userInfo['user_id'])); + $isOnline = $this->socketClientService->isOnlineAll(intval($userInfo['user_id'])); // 绑定fd与用户关系 - $this->socketFDService->bindRelation($request->fd, $userInfo['user_id']); + $this->socketClientService->bindRelation($request->fd, $userInfo['user_id']); // 加入群聊 $groupIds = UsersGroupMember::getUserGroupIds($userInfo['user_id']); @@ -113,7 +111,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos [$event, $data] = array_values(json_decode($frame->data, true)); if (isset(self::EVENTS[$event])) { - $this->messageHandleService->{self::EVENTS[$event]}($server, $frame, $data); + call_user_func_array([$this->messageHandleService, self::EVENTS[$event]], [$server, $frame, $data]); return; } } @@ -127,16 +125,15 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos */ public function onClose($server, int $fd, int $reactorId): void { - $user_id = $this->socketFDService->findFdUserId($fd); + $user_id = $this->socketClientService->findFdUserId($fd); stdout_log()->notice("客户端FD:{$fd} 已关闭连接 ,用户ID为【{$user_id}】,关闭时间:" . date('Y-m-d H:i:s')); // 解除fd关系 - $this->socketFDService->removeRelation($fd); + $this->socketClientService->removeRelation($fd); // 判断是否存在异地登录 - $isOnline = $this->socketFDService->isOnlineAll(intval($user_id)); - // ... 不存在异地登录,推送下线通知消息 + $isOnline = $this->socketClientService->isOnlineAll(intval($user_id)); if (!$isOnline) { // 推送消息至队列 $this->producer->produce( diff --git a/app/Exception/Handler/AppExceptionHandler.php b/app/Exception/Handler/AppExceptionHandler.php index c36a66d..4dbfea5 100644 --- a/app/Exception/Handler/AppExceptionHandler.php +++ b/app/Exception/Handler/AppExceptionHandler.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Exception\Handler; +use App\Constants\ResponseCode; use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\ExceptionHandler\ExceptionHandler; use Hyperf\HttpMessage\Stream\SwooleStream; @@ -27,7 +28,12 @@ class AppExceptionHandler extends ExceptionHandler $this->logger->error(sprintf('%s[%s] in %s', $throwable->getMessage(), $throwable->getLine(), $throwable->getFile())); $this->logger->error($throwable->getTraceAsString()); - return $response->withHeader('Server', 'Hyperf')->withStatus(500)->withBody(new SwooleStream('Internal Server Error.')); + $data = json_encode([ + 'code' => ResponseCode::SERVER_ERROR, + 'message' => 'Internal Server Error.' + ], JSON_UNESCAPED_UNICODE); + + return $response->withHeader('Server', 'Hyperf')->withStatus(500)->withBody(new SwooleStream($data)); } /** diff --git a/app/Helper/PushMessageHelper.php b/app/Helper/PushMessageHelper.php index 498a0b5..e5293c0 100644 --- a/app/Helper/PushMessageHelper.php +++ b/app/Helper/PushMessageHelper.php @@ -2,7 +2,6 @@ namespace App\Helper; - use App\Model\User; /** @@ -12,16 +11,6 @@ use App\Model\User; */ class PushMessageHelper { - // 消息事件类型 - const events = [ - 'chat_message',//用户聊天消息 - 'friend_apply',//好友添加申请消息 - 'join_group', //入群消息 - 'login_notify',//好友登录消息通知 - 'input_tip',//好友登录消息通知 - 'revoke_records',//好友撤回消息通知 - ]; - /** * 格式化对话的消息体 * diff --git a/app/Process/AsyncQueueConsumer.php b/app/Process/AsyncQueueConsumer.php index 4a6f861..b2f0318 100644 --- a/app/Process/AsyncQueueConsumer.php +++ b/app/Process/AsyncQueueConsumer.php @@ -14,9 +14,6 @@ namespace App\Process; use Hyperf\AsyncQueue\Process\ConsumerProcess; use Hyperf\Process\Annotation\Process; -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Message\AMQPMessage; - ///** // * @Process // */ @@ -24,36 +21,6 @@ class AsyncQueueConsumer extends ConsumerProcess { public function handle(): void { - //建立一个到RabbitMQ服务器的连接 - $connection = new AMQPStreamConnection('47.105.180.123', 5672, 'yuandong', 'yuandong','im'); - $channel = $connection->channel(); - - $channel->exchange_declare('im.message.fanout', 'fanout', true, false, false); - - list($queue_name, ,) = $channel->queue_declare("test", false, false, true, true); - - $channel->queue_bind($queue_name, 'im.message.fanout','routing-key-test'); - - echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; - - $callback = function($msg){ - $data = json_decode($msg->body,true); - $server = server(); - foreach ($server->connections as $fd) { - if ($server->exist($fd) && $server->isEstablished($fd)) { - $server->push($fd, "Recv: 我是后台进程 [{$data['message']}]"); - } - } - }; - - $channel->basic_consume($queue_name, 'asfafa', false, true, false, false, $callback); - - while(count($channel->callbacks)) { - $channel->wait(); - } - - $channel->close(); - $connection->close(); } } diff --git a/app/Service/ArticleService.php b/app/Service/ArticleService.php index 6afda8c..14e698a 100644 --- a/app/Service/ArticleService.php +++ b/app/Service/ArticleService.php @@ -11,6 +11,11 @@ use App\Traits\PagingTrait; use Hyperf\DbConnection\Db; use Exception; +/** + * 笔记服务层 + * + * @package App\Service + */ class ArticleService extends BaseService { use PagingTrait; @@ -41,7 +46,7 @@ class ArticleService extends BaseService */ public function getUserTags(int $user_id) { - $items = ArticleTag::where('user_id', $user_id)->orderBy('id', 'desc')->get(['id', 'tag_name',Db::raw('0 as count')])->toArray(); + $items = ArticleTag::where('user_id', $user_id)->orderBy('id', 'desc')->get(['id', 'tag_name', Db::raw('0 as count')])->toArray(); array_walk($items, function ($item) use ($user_id) { $item['count'] = (int)Article::where('user_id', $user_id)->whereRaw("FIND_IN_SET({$item['id']},tags_id)")->count(); }); diff --git a/app/Service/MessageHandleService.php b/app/Service/MessageHandleService.php index e2d0e64..0a41915 100644 --- a/app/Service/MessageHandleService.php +++ b/app/Service/MessageHandleService.php @@ -1,19 +1,18 @@ socketFDService->findFdUserId($frame->fd); - + $user_id = $this->socketClientService->findFdUserId($frame->fd); if ($user_id != $data['send_user']) { return; } @@ -73,7 +76,9 @@ class MessageHandleService 'created_at' => date('Y-m-d H:i:s'), ]); - if (!$result) return; + if (!$result) { + return; + } // 判断是否私聊 if ($data['source_type'] == 1) { @@ -85,7 +90,7 @@ class MessageHandleService ], intval($data['receive_user']), intval($data['send_user'])); // 设置好友消息未读数 - make(UnreadTalkCache::class)->setInc(intval($result->receive_id), strval($result->user_id)); + $this->unreadTalkCache->setInc(intval($result->receive_id), strval($result->user_id)); } $this->producer->produce( @@ -96,8 +101,6 @@ class MessageHandleService 'record_id' => $result->id ]) ); - - return true; } /** @@ -116,7 +119,5 @@ class MessageHandleService 'receive_user' => intval($data['receive_user']), //接收者ID ]) ); - - return true; } } diff --git a/app/Service/SocketFDService.php b/app/Service/SocketClientService.php similarity index 95% rename from app/Service/SocketFDService.php rename to app/Service/SocketClientService.php index 35c6e31..d03585a 100644 --- a/app/Service/SocketFDService.php +++ b/app/Service/SocketClientService.php @@ -9,7 +9,7 @@ use Hyperf\Redis\Redis; * * @package App\Service */ -class SocketFDService +class SocketClientService { /** * fd与用户绑定(使用hash 做处理) @@ -117,7 +117,10 @@ class SocketFDService */ public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID) { - return $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); + $arr = $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)); + return $arr ? array_map(function ($fd) { + return (int)$fd; + }, $arr) : []; } /** diff --git a/app/Service/TalkService.php b/app/Service/TalkService.php index 9354760..9aa8a5b 100644 --- a/app/Service/TalkService.php +++ b/app/Service/TalkService.php @@ -50,7 +50,7 @@ class TalkService extends BaseService } - $socketFDService = make(SocketFDService::class); + $socketFDService = make(SocketClientService::class); $runIdAll = $socketFDService->getServerRunIdAll(); $rows = array_map(function ($item) use ($user_id, $socketFDService, $runIdAll) { diff --git a/app/Traits/WebSocketTrait.php b/app/Traits/WebSocketTrait.php deleted file mode 100644 index 0c096d1..0000000 --- a/app/Traits/WebSocketTrait.php +++ /dev/null @@ -1,28 +0,0 @@ -