hyperf-chat/app/Amqp/Consumer/ChatMessageConsumer.php

408 lines
13 KiB
PHP
Raw Normal View History

2020-11-02 22:45:37 +08:00
<?php
declare(strict_types=1);
2020-12-26 21:33:40 +08:00
/**
* This is my open source code, please do not use it for commercial applications.
* For the full copyright and license information,
* please view the LICENSE file that was distributed with this source code
*
* @author Yuandong<837215079@qq.com>
* @link https://github.com/gzydong/hyperf-chat
*/
2020-11-02 22:45:37 +08:00
namespace App\Amqp\Consumer;
2021-07-05 21:52:44 +08:00
use App\Constants\TalkMsgType;
use App\Constants\TalkType;
2020-11-22 23:10:00 +08:00
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Message\AMQPMessage;
use App\Model\User;
2020-11-29 14:44:11 +08:00
use App\Model\UsersFriend;
2021-07-05 21:52:44 +08:00
use App\Model\Chat\TalkRecords;
use App\Model\Chat\TalkRecordsCode;
use App\Model\Chat\TalkRecordsFile;
use App\Model\Chat\TalkRecordsInvite;
use App\Model\Chat\TalkRecordsForward;
2021-05-20 22:56:55 +08:00
use App\Model\Group\Group;
2020-11-29 14:44:11 +08:00
use App\Service\SocketClientService;
2020-12-01 17:47:25 +08:00
use App\Constants\SocketConstants;
2021-05-20 22:56:55 +08:00
use App\Cache\Repository\LockRedis;
2021-05-21 22:56:42 +08:00
use App\Cache\SocketRoom;
2020-11-02 22:45:37 +08:00
2021-04-22 16:54:34 +08:00
/**
* 消息推送消费者队列
* @Consumer(name="ConsumerChat",enable=true)
*/
2020-11-04 11:57:16 +08:00
class ChatMessageConsumer extends ConsumerMessage
2020-11-02 22:45:37 +08:00
{
/**
* 交换机名称
*
* @var string
*/
2020-12-01 17:47:25 +08:00
public $exchange = SocketConstants::CONSUMER_MESSAGE_EXCHANGE;
2020-11-02 22:45:37 +08:00
/**
* 交换机类型
*
* @var string
*/
public $type = Type::FANOUT;
/**
* 路由key
*
* @var string
*/
public $routingKey = 'consumer:im:message';
2020-11-04 07:54:11 +08:00
/**
2020-11-29 14:44:11 +08:00
* @var SocketClientService
2020-11-04 07:54:11 +08:00
*/
2020-11-29 14:44:11 +08:00
private $socketClientService;
2020-11-21 19:53:01 +08:00
2020-11-27 19:48:41 +08:00
/**
2020-12-01 17:47:25 +08:00
* 消息事件与回调事件绑定
*
* @var array
2020-11-27 19:48:41 +08:00
*/
2020-11-22 23:10:00 +08:00
const EVENTS = [
2020-11-27 19:48:41 +08:00
// 聊天消息事件
SocketConstants::EVENT_TALK => 'onConsumeTalk',
2020-11-27 19:48:41 +08:00
// 键盘输入事件
SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard',
2020-11-27 19:48:41 +08:00
// 用户在线状态事件
2020-12-01 17:47:25 +08:00
SocketConstants::EVENT_ONLINE_STATUS => 'onConsumeOnlineStatus',
2020-11-27 19:48:41 +08:00
// 聊天消息推送事件
SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk',
2020-11-27 19:48:41 +08:00
// 好友申请相关事件
SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply'
2020-11-22 23:10:00 +08:00
];
2020-11-21 19:53:01 +08:00
/**
* ChatMessageConsumer constructor.
2021-04-20 16:30:57 +08:00
*
2020-11-29 14:44:11 +08:00
* @param SocketClientService $socketClientService
2020-11-21 19:53:01 +08:00
*/
2021-05-21 22:56:42 +08:00
public function __construct(SocketClientService $socketClientService)
2020-11-03 17:12:57 +08:00
{
2020-11-29 14:44:11 +08:00
$this->socketClientService = $socketClientService;
2020-12-01 17:47:25 +08:00
// 动态设置 Rabbit MQ 消费队列名称
2020-12-01 13:54:40 +08:00
$this->setQueue('queue:im_message:' . SERVER_RUN_ID);
2020-11-03 17:12:57 +08:00
}
2020-11-02 22:45:37 +08:00
/**
* 重写创建队列生成类
* 注释:设置自动删除队列
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return parent::getQueueBuilder()->setAutoDelete(true);
}
/**
* 消费队列消息
*
2021-04-20 16:30:57 +08:00
* @param $data
2020-11-02 22:45:37 +08:00
* @param AMQPMessage $message
* @return string
*/
public function consumeMessage($data, AMQPMessage $message): string
2020-11-22 23:10:00 +08:00
{
if (isset($data['event'])) {
// [加锁]防止消息重复消费
2021-05-20 22:56:55 +08:00
$lockName = sprintf('ws-message:%s-%s', SERVER_RUN_ID, $data['uuid']);
2021-05-21 17:43:41 +08:00
if (!LockRedis::getInstance()->lock($lockName, 60)) {
2020-12-01 17:47:25 +08:00
return Result::ACK;
}
// 调用对应事件绑定的回调方法
2020-11-22 23:10:00 +08:00
return $this->{self::EVENTS[$data['event']]}($data, $message);
}
return Result::ACK;
}
/**
* 对话聊天消息
*
2021-04-20 16:30:57 +08:00
* @param array $data 队列消息
2020-11-22 23:10:00 +08:00
* @param AMQPMessage $message
* @return string
*/
2020-12-01 13:54:40 +08:00
public function onConsumeTalk(array $data, AMQPMessage $message): string
2020-11-02 22:45:37 +08:00
{
2021-07-05 21:52:44 +08:00
$talk_type = $data['data']['talk_type'];
$sender_id = $data['data']['sender_id'];
$receiver_id = $data['data']['receiver_id'];
$record_id = $data['data']['record_id'];
2021-06-29 17:30:43 +08:00
$fds = [];
2021-04-08 09:20:51 +08:00
$groupInfo = null;
2020-11-21 19:53:01 +08:00
2021-07-05 21:52:44 +08:00
if ($talk_type == TalkType::PRIVATE_CHAT) {
2021-06-29 17:30:43 +08:00
$fds = array_merge(
2021-07-05 21:52:44 +08:00
$this->socketClientService->findUserFds($sender_id),
$this->socketClientService->findUserFds($receiver_id)
2021-06-29 17:30:43 +08:00
);
2021-07-05 21:52:44 +08:00
} else if ($talk_type == TalkType::GROUP_CHAT) {
foreach (SocketRoom::getInstance()->getRoomMembers(strval($receiver_id)) as $uid) {
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid)));
2020-11-03 14:07:54 +08:00
}
2021-07-05 21:52:44 +08:00
$groupInfo = Group::where('id', $receiver_id)->first(['group_name', 'avatar']);
2020-11-21 19:53:01 +08:00
}
2020-12-01 13:54:40 +08:00
// 客户端ID去重
2021-07-05 21:52:44 +08:00
if (!$fds = array_unique($fds)) {
return Result::ACK;
}
2020-11-21 19:53:01 +08:00
2021-07-05 21:52:44 +08:00
$result = TalkRecords::leftJoin('users', 'users.id', '=', 'talk_records.user_id')
->where('talk_records.id', $record_id)
2020-11-21 19:53:01 +08:00
->first([
2021-07-05 21:52:44 +08:00
'talk_records.id',
'talk_records.talk_type',
'talk_records.msg_type',
'talk_records.user_id',
'talk_records.receiver_id',
'talk_records.content',
'talk_records.is_revoke',
'talk_records.created_at',
2020-11-21 19:53:01 +08:00
'users.nickname',
'users.avatar',
2020-11-21 19:53:01 +08:00
]);
2020-12-01 13:54:40 +08:00
if (!$result) return Result::ACK;
2020-11-22 23:10:00 +08:00
$file = $code_block = $forward = $invite = [];
2020-11-21 22:47:21 +08:00
switch ($result->msg_type) {
2021-07-05 21:52:44 +08:00
case TalkMsgType::FILE_MESSAGE:
$file = TalkRecordsFile::where('record_id', $result->id)->first([
'id', 'record_id', 'user_id', 'file_source', 'file_type',
'save_type', 'original_name', 'file_suffix', 'file_size', 'save_dir'
]);
2020-11-21 22:47:21 +08:00
$file = $file ? $file->toArray() : [];
2020-12-01 13:54:40 +08:00
$file && $file['file_url'] = get_media_url($file['save_dir']);
2021-07-05 21:52:44 +08:00
break;
case TalkMsgType::FORWARD_MESSAGE:
$forward = ['num' => 0, 'list' => []];
$forwardInfo = TalkRecordsForward::where('record_id', $result->id)->first(['records_id', 'text']);
if ($forwardInfo) {
$forward = [
'num' => count(parse_ids($forwardInfo->records_id)),
'list' => json_decode($forwardInfo->text, true) ?? []
];
}
break;
2020-12-01 13:54:40 +08:00
2021-07-05 21:52:44 +08:00
case TalkMsgType::CODE_MESSAGE:
$code_block = TalkRecordsCode::where('record_id', $result->id)->first(['record_id', 'code_lang', 'code']);
$code_block = $code_block ? $code_block->toArray() : [];
2020-11-21 22:47:21 +08:00
break;
2021-07-05 21:52:44 +08:00
case TalkMsgType::GROUP_INVITE_MESSAGE:
$notifyInfo = TalkRecordsInvite::where('record_id', $result->id)->first([
2020-11-21 22:47:21 +08:00
'record_id', 'type', 'operate_user_id', 'user_ids'
]);
$userInfo = User::where('id', $notifyInfo->operate_user_id)->first(['nickname', 'id']);
$invite = [
'type' => $notifyInfo->type,
2020-11-21 22:47:21 +08:00
'operate_user' => ['id' => $userInfo->id, 'nickname' => $userInfo->nickname],
'users' => User::whereIn('id', parse_ids($notifyInfo->user_ids))->get(['id', 'nickname'])->toArray()
2020-11-21 22:47:21 +08:00
];
2020-12-01 13:54:40 +08:00
unset($notifyInfo, $userInfo);
2020-11-21 22:47:21 +08:00
break;
2020-11-21 19:53:01 +08:00
}
2020-12-01 13:54:40 +08:00
$notify = [
2021-07-05 21:52:44 +08:00
'sender_id' => $sender_id,
'receiver_id' => $receiver_id,
'talk_type' => $talk_type,
'data' => $this->formatTalkMessage([
2021-04-08 09:20:51 +08:00
'id' => $result->id,
2021-07-05 21:52:44 +08:00
'talk_type' => $result->talk_type,
2021-04-08 09:20:51 +08:00
'msg_type' => $result->msg_type,
2021-07-05 21:52:44 +08:00
"user_id" => $result->user_id,
"receiver_id" => $result->receiver_id,
2021-04-08 09:20:51 +08:00
'avatar' => $result->avatar,
'nickname' => $result->nickname,
'group_name' => $groupInfo ? $groupInfo->group_name : '',
'group_avatar' => $groupInfo ? $groupInfo->avatar : '',
"created_at" => $result->created_at,
"content" => $result->content,
"file" => $file,
"code_block" => $code_block,
'forward' => $forward,
'invite' => $invite
2020-11-21 19:53:01 +08:00
])
];
2020-11-20 19:17:11 +08:00
2020-12-01 17:47:25 +08:00
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_TALK, $notify]));
2020-11-22 23:10:00 +08:00
return Result::ACK;
}
/**
* 键盘输入事件消息
*
2021-04-20 16:30:57 +08:00
* @param array $data 队列消息
2020-11-22 23:10:00 +08:00
* @param AMQPMessage $message
* @return string
*/
2020-12-01 13:54:40 +08:00
public function onConsumeKeyboard(array $data, AMQPMessage $message): string
2020-11-22 23:10:00 +08:00
{
2021-07-05 21:52:44 +08:00
$fds = $this->socketClientService->findUserFds($data['data']['receiver_id']);
2020-12-01 13:54:40 +08:00
2020-12-01 17:47:25 +08:00
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_KEYBOARD, $data['data']]));
2020-11-22 23:10:00 +08:00
return Result::ACK;
}
/**
* 用户上线或下线消息
*
2021-04-20 16:30:57 +08:00
* @param array $data 队列消息
2020-11-22 23:10:00 +08:00
* @param AMQPMessage $message
* @return string
*/
2020-12-01 13:54:40 +08:00
public function onConsumeOnlineStatus(array $data, AMQPMessage $message): string
2020-11-22 23:10:00 +08:00
{
2021-07-05 21:52:44 +08:00
$user_id = (int)$data['data']['user_id'];
$status = (int)$data['data']['status'];
2020-11-22 23:10:00 +08:00
$fds = [];
2021-07-05 21:52:44 +08:00
foreach (UsersFriend::getFriendIds($user_id) as $friend_id) {
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id)));
2020-11-22 23:10:00 +08:00
}
2021-07-05 21:52:44 +08:00
$this->socketPushNotify(array_unique($fds), json_encode([
SocketConstants::EVENT_ONLINE_STATUS, [
'user_id' => $user_id,
'status' => $status
]
]));
2020-11-22 23:10:00 +08:00
return Result::ACK;
}
/**
* 撤销聊天消息
*
2021-04-20 16:30:57 +08:00
* @param array $data 队列消息
2020-11-22 23:10:00 +08:00
* @param AMQPMessage $message
* @return string
*/
2020-12-01 13:54:40 +08:00
public function onConsumeRevokeTalk(array $data, AMQPMessage $message): string
2020-11-22 23:10:00 +08:00
{
2021-07-05 21:52:44 +08:00
$record = TalkRecords::where('id', $data['data']['record_id'])->first(['id', 'talk_type', 'user_id', 'receiver_id']);
2020-11-22 23:10:00 +08:00
$fds = [];
2021-07-05 21:52:44 +08:00
if ($record->talk_type == TalkType::PRIVATE_CHAT) {
$fds = array_merge($fds, $this->socketClientService->findUserFds($record->user_id));
$fds = array_merge($fds, $this->socketClientService->findUserFds($record->receiver_id));
} else if ($record->talk_type == TalkType::GROUP_CHAT) {
$userIds = SocketRoom::getInstance()->getRoomMembers(strval($record->receiver_id));
2020-11-22 23:10:00 +08:00
foreach ($userIds as $uid) {
2020-11-29 14:44:11 +08:00
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid));
2020-11-22 23:10:00 +08:00
}
}
2021-07-05 21:52:44 +08:00
$fds = array_unique($fds);
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_REVOKE_TALK, [
'talk_type' => $record->talk_type,
'sender_id' => $record->user_id,
'receiver_id' => $record->receiver_id,
'record_id' => $record->id,
]]));
2020-11-03 14:07:54 +08:00
2020-11-08 17:10:05 +08:00
return Result::ACK;
2020-11-02 22:45:37 +08:00
}
2020-11-27 19:48:41 +08:00
/**
* 好友申请消息
*
2021-04-20 16:30:57 +08:00
* @param array $data 队列消息
2020-11-27 19:48:41 +08:00
* @param AMQPMessage $message
2020-11-29 14:44:11 +08:00
* @return string
2020-11-27 19:48:41 +08:00
*/
2020-12-01 13:54:40 +08:00
public function onConsumeFriendApply(array $data, AMQPMessage $message): string
2020-11-27 19:48:41 +08:00
{
2020-11-29 14:44:11 +08:00
$fds = $this->socketClientService->findUserFds($data['data']['receive']);
2021-07-05 21:52:44 +08:00
$this->socketPushNotify(array_unique($fds), json_encode([
SocketConstants::EVENT_FRIEND_APPLY,
$data['data']
]));
2020-12-01 13:54:40 +08:00
return Result::ACK;
}
/**
* WebSocket 消息推送
*
* @param $fds
* @param $message
*/
private function socketPushNotify($fds, $message)
{
2020-11-30 22:59:49 +08:00
$server = server();
2020-11-27 19:48:41 +08:00
foreach ($fds as $fd) {
2021-05-22 14:47:46 +08:00
$server->exist(intval($fd)) && $server->push(intval($fd), $message);
2020-11-27 19:48:41 +08:00
}
2020-12-01 13:54:40 +08:00
}
2020-11-27 19:48:41 +08:00
2020-12-01 13:54:40 +08:00
/**
* 格式化对话的消息体
*
* @param array $data 对话的消息
* @return array
*/
2020-12-01 22:50:43 +08:00
private function formatTalkMessage(array $data): array
2020-12-01 13:54:40 +08:00
{
$message = [
2021-04-20 19:59:39 +08:00
"id" => 0, // 消息记录ID
2021-07-05 21:52:44 +08:00
"talk_type" => 1, // 消息来源[1:好友私信;2:群聊]
2021-04-20 19:59:39 +08:00
"msg_type" => 1, // 消息类型
2021-04-20 16:30:57 +08:00
"user_id" => 0, // 发送者用户ID
2021-07-05 21:52:44 +08:00
"receiver_id" => 0, // 接收者ID[好友ID或群ID]
2020-12-01 13:54:40 +08:00
// 发送消息人的信息
"nickname" => "",// 用户昵称
"avatar" => "",// 用户头像
"group_name" => "",// 群组名称
"group_avatar" => "",// 群组头像
2020-12-01 13:54:40 +08:00
// 不同的消息类型
2021-04-08 09:20:51 +08:00
"file" => [],
"code_block" => [],
"forward" => [],
"invite" => [],
2020-12-01 13:54:40 +08:00
// 消息创建时间
2021-07-05 21:52:44 +08:00
"content" => '',// 文本消息
2021-04-08 09:20:51 +08:00
"created_at" => "",
2021-07-05 21:52:44 +08:00
// 消息属性
"is_revoke" => 0, // 消息是否撤销
2020-12-01 13:54:40 +08:00
];
return array_merge($message, array_intersect_key($data, $message));
2020-11-27 19:48:41 +08:00
}
2020-11-02 22:45:37 +08:00
}