初始化

main
gzydong 2020-11-22 23:10:00 +08:00
parent b05d25e1de
commit db2ac5ed39
9 changed files with 408 additions and 99 deletions

View File

@ -4,21 +4,22 @@ declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Model\UsersFriend;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Redis\Redis;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Message\AMQPMessage;
use App\Model\User;
use App\Helper\PushMessageHelper;
use App\Model\Chat\ChatRecord;
use App\Model\Chat\ChatRecordsCode;
use App\Model\Chat\ChatRecordsFile;
use App\Model\User;
use App\Model\Chat\ChatRecordsInvite;
use App\Service\SocketFDService;
use App\Service\SocketRoomService;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Redis\Redis;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder;
/**
* @Consumer(name="聊天消息消费者",enable=true)
@ -56,9 +57,17 @@ class ChatMessageConsumer extends ConsumerMessage
*/
private $socketRoomService;
const EVENTS = [
'event_talk' => 'onConsumeTalk',
'event_keyboard' => 'onConsumeKeyboard',
'event_online_status' => 'onConsumeOnlineStatus',
'event_revoke_talk' => 'onConsumeRevokeTalk',
];
/**
* ChatMessageConsumer constructor.
* @param SocketFDService $socketFDService
* @param SocketRoomService $socketRoomService
*/
public function __construct(SocketFDService $socketFDService, SocketRoomService $socketRoomService)
{
@ -87,6 +96,22 @@ class ChatMessageConsumer extends ConsumerMessage
* @return string
*/
public function consumeMessage($data, AMQPMessage $message): string
{
if (isset($data['event'])) {
return $this->{self::EVENTS[$data['event']]}($data, $message);
}
return Result::ACK;
}
/**
* 对话聊天消息
*
* @param array $data 队列消息
* @param AMQPMessage $message
* @return string
*/
public function onConsumeTalk(array $data, AMQPMessage $message)
{
$redis = container()->get(Redis::class);
@ -96,13 +121,13 @@ class ChatMessageConsumer extends ConsumerMessage
return Result::ACK;
}
$source = $data['source'];
$source = $data['data']['source'];
$fids = $this->socketFDService->findUserFds($data['sender']);
$fids = $this->socketFDService->findUserFds($data['data']['sender']);
if ($source == 1) {// 私聊
$fids = array_merge($fids, $this->socketFDService->findUserFds($data['receive']));
$fids = array_merge($fids, $this->socketFDService->findUserFds($data['data']['receive']));
} else if ($source == 2) {//群聊
$userIds = $this->socketRoomService->getRoomMembers(strval($data['receive']));
$userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive']));
foreach ($userIds as $uid) {
$fids = array_merge($fids, $this->socketFDService->findUserFds(intval($uid)));
}
@ -118,7 +143,7 @@ class ChatMessageConsumer extends ConsumerMessage
* @var ChatRecord
*/
$result = ChatRecord::leftJoin('users', 'users.id', '=', 'chat_records.user_id')
->where('chat_records.id', $data['record_id'])
->where('chat_records.id', $data['data']['record_id'])
->first([
'chat_records.id',
'chat_records.source',
@ -133,6 +158,10 @@ class ChatMessageConsumer extends ConsumerMessage
'users.avatar as avatar',
]);
if (!$result) {
return Result::ACK;
}
$file = [];
$code_block = [];
$forward = [];
@ -171,9 +200,9 @@ class ChatMessageConsumer extends ConsumerMessage
}
$msg = [
'send_user' => $data['sender'],
'receive_user' => $data['receive'],
'source_type' => $data['source'],
'send_user' => $data['data']['sender'],
'receive_user' => $data['data']['receive'],
'source_type' => $data['data']['source'],
'data' => PushMessageHelper::formatTalkMsg([
'id' => $result->id,
'msg_type' => $result->msg_type,
@ -195,11 +224,99 @@ class ChatMessageConsumer extends ConsumerMessage
foreach ($fids as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['chat_message', $msg]));
$server->push($fd, json_encode(['event_talk', $msg]));
}
}
return Result::ACK;
}
/**
* 键盘输入事件消息
*
* @param array $data 队列消息
* @param AMQPMessage $message
* @return string
*/
public function onConsumeKeyboard(array $data, AMQPMessage $message)
{
$fds = $this->socketFDService->findUserFds($data['data']['receive_user']);
$server = server();
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_keyboard', $data['data']]));
}
}
return Result::ACK;
}
/**
* 用户上线或下线消息
*
* @param array $data 队列消息
* @param AMQPMessage $message
* @return string
*/
public function onConsumeOnlineStatus(array $data, AMQPMessage $message)
{
$user_id = $data['data']['user_id'];
$friends = UsersFriend::getFriendIds(intval($user_id));
$fds = [];
foreach ($friends as $friend_id) {
$fds = array_merge($fds, $this->socketFDService->findUserFds(intval($friend_id)));
}
$fds = array_unique($fds);
$server = server();
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_online_status', $data['data']]));
}
}
return Result::ACK;
}
/**
* 撤销聊天消息
*
* @param array $data 队列消息
* @param AMQPMessage $message
* @return string
*/
public function onConsumeRevokeTalk(array $data, AMQPMessage $message)
{
$record = ChatRecord::where('id', $data['data']['record_id'])->first(['id', 'source', 'user_id', 'receive_id']);
$fds = [];
if ($record->source == 1) {
$fds = array_merge($fds, $this->socketFDService->findUserFds($record->user_id));
$fds = array_merge($fds, $this->socketFDService->findUserFds($record->receive_id));
} else if ($record->source == 2) {
$userIds = $this->socketRoomService->getRoomMembers(strval($record->receive_id));
foreach ($userIds as $uid) {
$fds = array_merge($fds, $this->socketFDService->findUserFds(intval($uid)));
}
}
$fds = array_unique($fds);
$server = server();
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_revoke_talk', [
'record_id' => $record->id,
'source' => $record->source,
'user_id' => $record->user_id,
'receive_id' => $record->receive_id,
]]));
}
}
unset($fids, $result, $msg);
return Result::ACK;
}
}

View File

@ -14,14 +14,31 @@ class ChatMessageProducer extends ProducerMessage
public $type = Type::FANOUT;
public function __construct($sender, $receive, $source, $record_id)
const EVENTS = [
'event_talk',
'event_keyboard',
'event_online_status',
'event_revoke_talk'
];
/**
* 初始化处理...
*
* @param string $event 事件名
* @param array $data 数据
* @param array $options 其它参数
*/
public function __construct(string $event, array $data, array $options = [])
{
if (!in_array($event, self::EVENTS)) {
new \Exception('事件名未注册...');
}
$message = [
'uuid' => $this->uuid(),
'sender' => intval($sender), //发送者ID
'receive' => intval($receive), //接收者ID
'source' => intval($source), //接收者类型 1:好友;2:群组
'record_id' => intval($record_id)
'uuid' => $this->uuid(),// 自定义消息ID
'event' => $event,
'data' => $data,
'options' => $options
];
$this->payload = $message;

View File

@ -75,7 +75,12 @@ class GroupController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $data['group_id'], 2, $data['record_id'])
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($data['group_id']), //接收者ID
'source' => 2, //接收者类型 1:好友;2:群组
'record_id' => intval($data['record_id'])
])
);
return $this->response->success([
@ -134,7 +139,12 @@ class GroupController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['group_id'], 2, $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['group_id']), //接收者ID
'source' => 2, //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
return $this->response->success([], '好友已成功加入群聊...');
@ -163,7 +173,12 @@ class GroupController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['group_id'], 2, $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['group_id']), //接收者ID
'source' => 2, //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
return $this->response->success([], '已成功退出群组...');
@ -224,7 +239,12 @@ class GroupController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['group_id'], 2, $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['group_id']), //接收者ID
'source' => 2, //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
return $this->response->success([], '已成功退出群组...');

View File

@ -227,6 +227,14 @@ class TalkController extends CController
]);
[$isTrue, $message,] = $this->talkService->revokeRecord($this->uid(), $params['record_id']);
if($isTrue){
$this->producer->produce(
new ChatMessageProducer('event_revoke_talk', [
'record_id' => $params['record_id']
])
);
}
return $isTrue
? $this->response->success([], $message)
@ -323,7 +331,12 @@ class TalkController extends CController
// ...消息推送队列
foreach ($ids as $value) {
$this->producer->produce(
new ChatMessageProducer($user_id, $value['receive_id'], $value['source'], $value['record_id'])
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($value['receive_id']), //接收者ID
'source' => intval($value['source']), //接收者类型 1:好友;2:群组
'record_id' => $value['record_id']
])
);
}
@ -518,7 +531,12 @@ class TalkController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['receive_id'], $params['source'], $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['receive_id']), //接收者ID
'source' => intval($params['source']), //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
return $this->response->success();
@ -558,7 +576,12 @@ class TalkController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['receive_id'], $params['source'], $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['receive_id']), //接收者ID
'source' => intval($params['source']), //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
return $this->response->success();
@ -615,7 +638,12 @@ class TalkController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['receive_id'], $params['source'], $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['receive_id']), //接收者ID
'source' => intval($params['source']), //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
@ -668,7 +696,12 @@ class TalkController extends CController
// ...消息推送队列
$this->producer->produce(
new ChatMessageProducer($user_id, $params['receive_id'], $params['source'], $record_id)
new ChatMessageProducer('event_talk', [
'sender' => $user_id, //发送者ID
'receive' => intval($params['receive_id']), //接收者ID
'source' => intval($params['source']), //接收者类型 1:好友;2:群组
'record_id' => $record_id
])
);
return $this->response->success();

View File

@ -3,11 +3,6 @@ declare(strict_types=1);
namespace App\Controller;
use App\Cache\LastMsgCache;
use App\Cache\UnreadTalkCache;
use App\Model\Chat\ChatRecord;
use App\Model\Group\UsersGroupMember;
use App\Service\SocketRoomService;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
@ -16,11 +11,14 @@ use Phper666\JWTAuth\JWT;
use Swoole\Http\Request;
use Swoole\Websocket\Frame;
use Hyperf\Amqp\Producer;
use App\Amqp\Producer\ChatMessageProducer;
use Swoole\Http\Response;
use Swoole\WebSocket\Server;
use App\Traits\WebSocketTrait;
use App\Service\SocketFDService;
use App\Service\MessageHandleService;
use App\Service\SocketRoomService;
use App\Model\Group\UsersGroupMember;
use App\Amqp\Producer\ChatMessageProducer;
/**
* Class WebSocketController
@ -54,6 +52,17 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
*/
private $socketRoomService;
/**
* @inject
* @var MessageHandleService
*/
private $messageHandleService;
const EVENTS = [
'event_talk' => 'onTalk',
'event_keyboard' => 'onKeyboard',
];
/**
* 连接创建成功回调事件
*
@ -66,6 +75,9 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
$userInfo = $this->jwt->getParserData($token);
stdout_log()->notice("用户连接信息 : user_id:{$userInfo['user_id']} | fd:{$request->fd} 时间:" . date('Y-m-d H:i:s'));
// 判断是否存在异地登录
$isOnline = $this->socketFDService->isOnlineAll(intval($userInfo['user_id']));
// 绑定fd与用户关系
$this->socketFDService->bindRelation($request->fd, $userInfo['user_id']);
@ -74,6 +86,17 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
foreach ($groupIds as $group_id) {
$this->socketRoomService->addRoomMember($userInfo['user_id'], $group_id);
}
if (!$isOnline) {
// 推送消息至队列
$this->producer->produce(
new ChatMessageProducer('event_online_status', [
'user_id' => $userInfo['user_id'],
'status' => 1,
'notify' => '好友上线通知...'
])
);
}
}
/**
@ -87,59 +110,12 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
// 判断是否为心跳检测
if ($frame->data == 'PING') return;
// 当前用户ID
$user_id = $this->socketFDService->findFdUserId($frame->fd);
[$event, $data] = array_values(json_decode($frame->data, true));
if ($user_id != $data['send_user']) {
if (isset(self::EVENTS[$event])) {
$this->messageHandleService->{self::EVENTS[$event]}($server, $frame, $data);
return;
}
//验证消息类型 私聊|群聊
if (!in_array($data['source_type'], [1, 2])) {
return;
}
//验证发送消息用户与接受消息用户之间是否存在好友或群聊关系(后期走缓存)
// if ($data['source_type'] == 1) {//私信
// //判断发送者和接受者是否是好友关系
// if (!UsersFriend::isFriend(intval($data['send_user']), intval($data['receive_user']))) {
// return;
// }
// } else if ($data['source_type'] == 2) {//群聊
// //判断是否属于群成员
// if (!UsersGroup::isMember(intval($data['receive_user']), intval($data['send_user']))) {
// return;
// }
// }
$result = ChatRecord::create([
'source' => $data['source_type'],
'msg_type' => 1,
'user_id' => $data['send_user'],
'receive_id' => $data['receive_user'],
'content' => htmlspecialchars($data['text_message']),
'created_at' => date('Y-m-d H:i:s'),
]);
if (!$result) return;
// 判断是否私聊
if ($data['source_type'] == 1) {
$msg_text = mb_substr($result->content, 0, 30);
// 缓存最后一条消息
LastMsgCache::set([
'text' => $msg_text,
'created_at' => $result->created_at
], intval($data['receive_user']), intval($data['send_user']));
// 设置好友消息未读数
make(UnreadTalkCache::class)->setInc(intval($result->receive_id), strval($result->user_id));
}
$this->producer->produce(
new ChatMessageProducer($data['send_user'], $data['receive_user'], $data['source_type'], $result->id)
);
}
/**
@ -153,18 +129,23 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
{
$user_id = $this->socketFDService->findFdUserId($fd);
stdout_log()->notice("客户端FD:{$fd} 已关闭连接 用户ID为【{$user_id}】,关闭时间:" . date('Y-m-d H:i:s'));
// stdout_log()->notice("客户端FD:{$fd} 已关闭连接 用户ID为【{$user_id}】,关闭时间:" . date('Y-m-d H:i:s'));
// 解除fd关系
$this->socketFDService->removeRelation($fd);
// 判断是否存在异地登录
$isOnline = $this->socketFDService->isOnlineAll(intval($user_id));
if (!$isOnline) {
// ... 不存在异地登录,推送下线通知消息
// ... 包装推送消息至队列
} else {
stdout_log()->notice("用户:{$user_id} 存在异地登录...");
if (!$isOnline) {
// 推送消息至队列
$this->producer->produce(
new ChatMessageProducer('event_online_status', [
'user_id' => $user_id,
'status' => 0,
'notify' => '好友离线通知通知...'
])
);
}
}
}

10
app/Helper/JsonHelper.php Normal file
View File

@ -0,0 +1,10 @@
<?php
namespace App\Helper;
class JsonHelper
{
}

View File

@ -266,6 +266,8 @@ class GroupService extends BaseService
throw new Exception('添加群通知记录失败2 : quitGroupChat');
}
UsersChatList::where('uid', $user_id)->where('group_id', $group_id)->update(['status' => 0, 'updated_at' => date('Y-m-d H:i:s')]);
$record_id = $result->id;
}
@ -322,6 +324,10 @@ class GroupService extends BaseService
throw new Exception('添加群通知记录失败2');
}
foreach ($member_ids as $member_id) {
UsersChatList::where('uid', $member_id)->where('group_id', $group_id)->update(['status' => 0, 'updated_at' => date('Y-m-d H:i:s')]);
}
Db::commit();
} catch (Exception $e) {
Db::rollBack();

View File

@ -0,0 +1,122 @@
<?php
namespace App\Service;
use Hyperf\Di\Annotation\Inject;
use App\Amqp\Producer\ChatMessageProducer;
use App\Cache\LastMsgCache;
use App\Cache\UnreadTalkCache;
use App\Model\Chat\ChatRecord;
use App\Model\Group\UsersGroup;
use App\Model\UsersFriend;
use Hyperf\Amqp\Producer;
use Swoole\Http\Response;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
class MessageHandleService
{
/**
* @Inject
* @var Producer
*/
private $producer;
/**
* @inject
* @var SocketFDService
*/
private $socketFDService;
/**
* 对话消息
*
* @param Response|Server $server
* @param Frame $frame
* @param array|string $data 解析后数据
* @return bool|void
*/
public function onTalk($server, Frame $frame, $data)
{
// 当前用户ID
$user_id = $this->socketFDService->findFdUserId($frame->fd);
if ($user_id != $data['send_user']) {
return;
}
//验证消息类型 私聊|群聊
if (!in_array($data['source_type'], [1, 2])) {
return;
}
//验证发送消息用户与接受消息用户之间是否存在好友或群聊关系(后期走缓存)
if ($data['source_type'] == 1) {//私信
//判断发送者和接受者是否是好友关系
if (!UsersFriend::isFriend(intval($data['send_user']), intval($data['receive_user']))) {
return;
}
} else if ($data['source_type'] == 2) {//群聊
//判断是否属于群成员
if (!UsersGroup::isMember(intval($data['receive_user']), intval($data['send_user']))) {
return;
}
}
$result = ChatRecord::create([
'source' => $data['source_type'],
'msg_type' => 1,
'user_id' => $data['send_user'],
'receive_id' => $data['receive_user'],
'content' => htmlspecialchars($data['text_message']),
'created_at' => date('Y-m-d H:i:s'),
]);
if (!$result) return;
// 判断是否私聊
if ($data['source_type'] == 1) {
$msg_text = mb_substr($result->content, 0, 30);
// 缓存最后一条消息
LastMsgCache::set([
'text' => $msg_text,
'created_at' => $result->created_at
], intval($data['receive_user']), intval($data['send_user']));
// 设置好友消息未读数
make(UnreadTalkCache::class)->setInc(intval($result->receive_id), strval($result->user_id));
}
$this->producer->produce(
new ChatMessageProducer('event_talk', [
'sender' => intval($data['send_user']), //发送者ID
'receive' => intval($data['receive_user']), //接收者ID
'source' => intval($data['source_type']), //接收者类型 1:好友;2:群组
'record_id' => $result->id
])
);
return true;
}
/**
* 键盘输入消息
*
* @param Response|Server $server
* @param Frame $frame
* @param array|string $data 解析后数据
* @return bool|void
*/
public function onKeyboard($server, Frame $frame, $data)
{
$this->producer->produce(
new ChatMessageProducer('event_keyboard', [
'send_user' => intval($data['send_user']), //发送者ID
'receive_user' => intval($data['receive_user']), //接收者ID
])
);
return true;
}
}

View File

@ -4,6 +4,7 @@ namespace App\Service;
use App\Cache\FriendRemarkCache;
use App\Cache\LastMsgCache;
use App\Cache\UnreadTalkCache;
use App\Model\Chat\ChatRecord;
use App\Model\Chat\ChatRecordsCode;
use App\Model\Chat\ChatRecordsFile;
@ -48,7 +49,11 @@ class TalkService extends BaseService
return [];
}
$rows = array_map(function ($item) use ($user_id) {
$socketFDService = make(SocketFDService::class);
$runIdAll = $socketFDService->getServerRunIdAll();
$rows = array_map(function ($item) use ($user_id, $socketFDService, $runIdAll) {
$data['id'] = $item['id'];
$data['type'] = $item['type'];
$data['friend_id'] = $item['friend_id'];
@ -66,14 +71,13 @@ class TalkService extends BaseService
if ($item['type'] == 1) {
$data['name'] = $item['nickname'];
$data['avatar'] = $item['user_avatar'];
// $data['unread_num'] = app('unread.talk')->get($user_id, $item['friend_id']);
// $data['online'] = app('client.manage')->isOnline($item['friend_id']);
$data['unread_num'] = make(UnreadTalkCache::class)->get($user_id, $item['friend_id']);
$data['online'] = $socketFDService->isOnlineAll($item['friend_id'], $runIdAll);
$remark = FriendRemarkCache::get($user_id, $item['friend_id']);
if ($remark) {
$data['remark_name'] = $remark;
} else {
$info = UsersFriend::select('user1', 'user2', 'user1_remark', 'user2_remark')
->where('user1', ($user_id < $item['friend_id']) ? $user_id : $item['friend_id'])
->where('user2', ($user_id < $item['friend_id']) ? $item['friend_id'] : $user_id)->first();
@ -89,7 +93,6 @@ class TalkService extends BaseService
}
$records = LastMsgCache::get($item['type'] == 1 ? $item['friend_id'] : $item['group_id'], $item['type'] == 1 ? $user_id : 0);
if ($records) {
$data['msg_text'] = $records['text'];
$data['updated_at'] = $records['created_at'];