diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index c8d6397..5efe195 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -13,6 +13,7 @@ declare(strict_types=1); namespace App\Amqp\Consumer; +use App\Model\Group\Group; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Result; use Hyperf\Amqp\Message\ConsumerMessage; @@ -76,19 +77,19 @@ class ChatMessageConsumer extends ConsumerMessage */ const EVENTS = [ // 聊天消息事件 - SocketConstants::EVENT_TALK => 'onConsumeTalk', + SocketConstants::EVENT_TALK => 'onConsumeTalk', // 键盘输入事件 - SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard', + SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard', // 用户在线状态事件 SocketConstants::EVENT_ONLINE_STATUS => 'onConsumeOnlineStatus', // 聊天消息推送事件 - SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk', + SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk', // 好友申请相关事件 - SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply' + SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply' ]; /** @@ -99,7 +100,7 @@ class ChatMessageConsumer extends ConsumerMessage public function __construct(SocketClientService $socketClientService, SocketRoomService $socketRoomService) { $this->socketClientService = $socketClientService; - $this->socketRoomService = $socketRoomService; + $this->socketRoomService = $socketRoomService; // 动态设置 Rabbit MQ 消费队列名称 $this->setQueue('queue:im_message:' . SERVER_RUN_ID); @@ -129,7 +130,7 @@ class ChatMessageConsumer extends ConsumerMessage if (isset($data['event'])) { $redis = container()->get(Redis::class); - //[加锁]防止消息重复消费 + // [加锁]防止消息重复消费 $lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']); if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) { return Result::ACK; @@ -151,22 +152,23 @@ class ChatMessageConsumer extends ConsumerMessage */ public function onConsumeTalk(array $data, AMQPMessage $message): string { - $source = $data['data']['source']; + $source = $data['data']['source']; + $fds = $this->socketClientService->findUserFds($data['data']['sender']); + $group_name = ''; - $fds = $this->socketClientService->findUserFds($data['data']['sender']); if ($source == 1) {// 私聊 $fds = array_merge($fds, $this->socketClientService->findUserFds($data['data']['receive'])); - } else if ($source == 2) {//群聊 + } else if ($source == 2) {// 群聊 $userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive'])); foreach ($userIds as $uid) { $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); } + + $group_name = Group::where('id', $data['data']['receive'])->value('group_name'); } // 客户端ID去重 - if (!$fds = array_unique($fds)) { - return Result::ACK; - } + if (!$fds = array_unique($fds)) return Result::ACK; /** * @var ChatRecord @@ -182,75 +184,71 @@ class ChatMessageConsumer extends ConsumerMessage 'chat_records.content', 'chat_records.is_revoke', 'chat_records.created_at', - 'users.nickname', - 'users.avatar as avatar', + 'users.avatar', ]); if (!$result) return Result::ACK; - $file = []; - $code_block = []; - $forward = []; - $invite = []; + $file = $code_block = $forward = $invite = []; + switch ($result->msg_type) { - case 2://文件消息 + case 2:// 文件消息 $file = ChatRecordsFile::where('record_id', $result->id)->first(['id', 'record_id', 'user_id', 'file_source', 'file_type', 'save_type', 'original_name', 'file_suffix', 'file_size', 'save_dir']); $file = $file ? $file->toArray() : []; $file && $file['file_url'] = get_media_url($file['save_dir']); break; - case 3://入群消息/退群消息 + case 3:// 入群消息/退群消息 $notifyInfo = ChatRecordsInvite::where('record_id', $result->id)->first([ 'record_id', 'type', 'operate_user_id', 'user_ids' ]); $userInfo = User::where('id', $notifyInfo->operate_user_id)->first(['nickname', 'id']); - - $invite = [ - 'type' => $notifyInfo->type, + $invite = [ + 'type' => $notifyInfo->type, 'operate_user' => ['id' => $userInfo->id, 'nickname' => $userInfo->nickname], - 'users' => User::whereIn('id', parse_ids($notifyInfo->user_ids))->get(['id', 'nickname'])->toArray() + 'users' => User::whereIn('id', parse_ids($notifyInfo->user_ids))->get(['id', 'nickname'])->toArray() ]; unset($notifyInfo, $userInfo); break; - case 4://会话记录消息 - $forward = ['num' => 0, 'list' => []]; - + case 4: //会话记录消息 + $forward = ['num' => 0, 'list' => []]; $forwardInfo = ChatRecordsForward::where('record_id', $result->id)->first(['records_id', 'text']); if ($forwardInfo) { $forward = [ - 'num' => count(parse_ids($forwardInfo->records_id)), + 'num' => count(parse_ids($forwardInfo->records_id)), 'list' => json_decode($forwardInfo->text, true) ?? [] ]; } break; - case 5://代码块消息 + case 5: //代码块消息 $code_block = ChatRecordsCode::where('record_id', $result->id)->first(['record_id', 'code_lang', 'code']); $code_block = $code_block ? $code_block->toArray() : []; break; } $notify = [ - 'send_user' => $data['data']['sender'], + 'send_user' => $data['data']['sender'], 'receive_user' => $data['data']['receive'], - 'source_type' => $data['data']['source'], - 'data' => $this->formatTalkMessage([ - 'id' => $result->id, - 'msg_type' => $result->msg_type, - 'source' => $result->source, - 'avatar' => $result->avatar, - 'nickname' => $result->nickname, - "user_id" => $result->user_id, + 'source_type' => $data['data']['source'], + 'data' => $this->formatTalkMessage([ + 'id' => $result->id, + 'msg_type' => $result->msg_type, + 'source' => $result->source, + 'avatar' => $result->avatar, + 'nickname' => $result->nickname, + 'group_name' => $group_name, + "user_id" => $result->user_id, "receive_id" => $result->receive_id, "created_at" => $result->created_at, - "content" => $result->content, - "file" => $file, + "content" => $result->content, + "file" => $file, "code_block" => $code_block, - 'forward' => $forward, - 'invite' => $invite + 'forward' => $forward, + 'invite' => $invite ]) ]; @@ -324,9 +322,9 @@ class ChatMessageConsumer extends ConsumerMessage $this->socketPushNotify( array_unique($fds), json_encode([SocketConstants::EVENT_REVOKE_TALK, [ - 'record_id' => $record->id, - 'source' => $record->source, - 'user_id' => $record->user_id, + 'record_id' => $record->id, + 'source' => $record->source, + 'user_id' => $record->user_id, 'receive_id' => $record->receive_id, ]]) ); @@ -373,23 +371,24 @@ class ChatMessageConsumer extends ConsumerMessage private function formatTalkMessage(array $data): array { $message = [ - "id" => 0,//消息记录ID - "source" => 1,//消息来源[1:好友私信;2:群聊] - "msg_type" => 1, - "user_id" => 0,//发送者用户ID - "receive_id" => 0,//接收者ID[好友ID或群ID] - "content" => '',//文本消息 - "is_revoke" => 0,//消息是否撤销 + "id" => 0,// 消息记录ID + "source" => 1,// 消息来源[1:好友私信;2:群聊] + "msg_type" => 1, + "user_id" => 0,// 发送者用户ID + "receive_id" => 0,// 接收者ID[好友ID或群ID] + "content" => '',// 文本消息 + "is_revoke" => 0,// 消息是否撤销 // 发送消息人的信息 - "nickname" => "", - "avatar" => "", + "nickname" => "",// 用户昵称 + "avatar" => "",// 用户头像 + "group_name" => "",// 群组名称 // 不同的消息类型 - "file" => [], + "file" => [], "code_block" => [], - "forward" => [], - "invite" => [], + "forward" => [], + "invite" => [], // 消息创建时间 "created_at" => "",