优化代码

main
gzydong 2021-07-07 19:43:09 +08:00
parent 7ef588f9da
commit 5579b16548
14 changed files with 795 additions and 471 deletions

View File

@ -13,6 +13,8 @@ namespace App\Amqp\Consumer;
use App\Constants\TalkMsgType; use App\Constants\TalkMsgType;
use App\Constants\TalkType; use App\Constants\TalkType;
use App\Model\UsersFriendsApply;
use App\Service\UserService;
use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Result; use Hyperf\Amqp\Result;
use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Message\ConsumerMessage;
@ -20,7 +22,6 @@ use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder; use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Message\AMQPMessage;
use App\Model\User; use App\Model\User;
use App\Model\UsersFriend;
use App\Model\Chat\TalkRecords; use App\Model\Chat\TalkRecords;
use App\Model\Chat\TalkRecordsCode; use App\Model\Chat\TalkRecordsCode;
use App\Model\Chat\TalkRecordsFile; use App\Model\Chat\TalkRecordsFile;
@ -34,7 +35,7 @@ use App\Cache\SocketRoom;
/** /**
* 消息推送消费者队列 * 消息推送消费者队列
* @Consumer(name="ConsumerChat",enable=true) * @Consumer(name="ConsumerChat",enable=false)
*/ */
class ChatMessageConsumer extends ConsumerMessage class ChatMessageConsumer extends ConsumerMessage
{ {
@ -287,7 +288,9 @@ class ChatMessageConsumer extends ConsumerMessage
$status = (int)$data['data']['status']; $status = (int)$data['data']['status'];
$fds = []; $fds = [];
foreach (UsersFriend::getFriendIds($user_id) as $friend_id) {
$ids = container()->get(UserService::class)->getFriendIds($user_id);
foreach ($ids as $friend_id) {
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id))); $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id)));
} }
@ -343,12 +346,38 @@ class ChatMessageConsumer extends ConsumerMessage
*/ */
public function onConsumeFriendApply(array $data, AMQPMessage $message): string public function onConsumeFriendApply(array $data, AMQPMessage $message): string
{ {
$fds = $this->socketClientService->findUserFds($data['data']['receive']); $data = $data['data'];
$this->socketPushNotify(array_unique($fds), json_encode([ $applyInfo = UsersFriendsApply::where('id', $data['apply_id'])->first();
SocketConstants::EVENT_FRIEND_APPLY, if (!$applyInfo) return Result::ACK;
$data['data']
])); $fds = $this->socketClientService->findUserFds($data['type'] == 1 ? $applyInfo->friend_id : $applyInfo->user_id);
if ($data['type'] == 1) {
$msg = [
'sender_id' => $applyInfo->user_id,
'receiver_id' => $applyInfo->friend_id,
'remark' => $applyInfo->remark,
];
} else {
$msg = [
'sender_id' => $applyInfo->friend_id,
'receiver_id' => $applyInfo->user_id,
'status' => $applyInfo->status,
'remark' => $applyInfo->remark,
];
}
$friendInfo = User::select(['id', 'avatar', 'nickname', 'mobile', 'motto'])->find($data['type'] == 1 ? $applyInfo->user_id : $applyInfo->friend_id);
$msg['friend'] = [
'user_id' => $friendInfo->id,
'avatar' => $friendInfo->avatar,
'nickname' => $friendInfo->nickname,
'mobile' => $friendInfo->mobile,
];
$this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_FRIEND_APPLY, $msg]));
return Result::ACK; return Result::ACK;
} }

View File

@ -54,4 +54,9 @@ class ChatMessageProducer extends ProducerMessage
$this->payload = $message; $this->payload = $message;
} }
public function getPayload()
{
return $this->payload;
}
} }

View File

@ -0,0 +1,171 @@
<?php
namespace App\Controller\Api\V1;
use App\Amqp\Producer\ChatMessageProducer;
use App\Cache\FriendApply;
use App\Constants\SocketConstants;
use App\Model\UsersFriendsApply;
use App\Service\SocketClientService;
use App\Service\UserService;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\Di\Annotation\Inject;
use App\Service\ContactApplyService;
use Psr\Http\Message\ResponseInterface;
/**
* Class ContactsApplyController
* @Controller(prefix="/api/v1/contacts/apply")
* @Middleware(JWTAuthMiddleware::class)
*
* @package App\Controller\Api\V1
*/
class ContactsApplyController extends CController
{
/**
* @Inject
* @var ContactApplyService
*/
private $service;
/**
* @inject
* @var SocketClientService
*/
private $socketClientService;
/**
* @RequestMapping(path="create", methods="post")
* @param UserService $userService
* @return ResponseInterface
*/
public function create(UserService $userService)
{
$params = $this->request->inputs(['friend_id', 'remarks']);
$this->validate($params, [
'friend_id' => 'required|integer',
'remarks' => 'present|max:50'
]);
$params['friend_id'] = (int)$params['friend_id'];
$user = $userService->findById($params['friend_id']);
if (!$user) {
return $this->response->fail('用户不存在!');
}
$user_id = $this->uid();
[$isTrue, $result] = $this->service->create($user_id, $params['friend_id'], $params['remarks']);
if (!$isTrue) {
return $this->response->fail('添加好友申请失败!');
}
// 好友申请未读消息数自增
FriendApply::getInstance()->incr($params['friend_id'], 1);
// 判断对方是否在线。如果在线发送消息通知
if ($this->socketClientService->isOnlineAll($params['friend_id'])) {
push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [
'apply_id' => $result->id,
'type' => 1,
]));
}
return $this->response->success([], '发送好友申请成功...');
}
/**
* @RequestMapping(path="accept", methods="post")
* @return ResponseInterface
*/
public function accept()
{
$params = $this->request->inputs(['apply_id', 'remarks']);
$this->validate($params, [
'apply_id' => 'required|integer',
'remarks' => 'present|max:20'
]);
$user_id = $this->uid();
$isTrue = $this->service->accept($user_id, intval($params['apply_id']), $params['remarks']);
if (!$isTrue) {
return $this->response->fail('处理失败!');
}
$friend_id = UsersFriendsApply::where('id', $params['apply_id'])->where('friend_id', $user_id)->value('user_id');
// 判断对方是否在线。如果在线发送消息通知
if ($this->socketClientService->isOnlineAll($friend_id)) {
push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [
'apply_id' => (int)$params['apply_id'],
'type' => 2,
]));
}
return $this->response->success([], '处理成功...');
}
/**
* @RequestMapping(path="decline", methods="post")
* @return ResponseInterface
*/
public function decline()
{
$params = $this->request->inputs(['apply_id', 'remarks']);
$this->validate($params, [
'apply_id' => 'required|integer',
'remarks' => 'present|max:20'
]);
$isTrue = $this->service->decline($this->uid(), intval($params['apply_id']), $params['remarks']);
return $isTrue
? $this->response->success()
: $this->response->fail();
}
/**
* @RequestMapping(path="delete", methods="post")
* @return ResponseInterface
*/
public function delete()
{
$params = $this->request->inputs(['apply_id']);
$this->validate($params, [
'apply_id' => 'required|integer'
]);
return $this->service->delete($this->uid(), intval($params['apply_id']))
? $this->response->success()
: $this->response->fail();
}
/**
* 获取联系人申请未读数
* @RequestMapping(path="records", methods="get")
*
* @return ResponseInterface
*/
public function records()
{
$params = $this->request->inputs(['page', 'page_size']);
$this->validate($params, [
'page' => 'present|integer',
'page_size' => 'present|integer'
]);
$page = $this->request->input('page', 1);
$page_size = $this->request->input('page_size', 10);
$user_id = $this->uid();
$data = $this->service->getApplyRecords($user_id, $page, $page_size);
FriendApply::getInstance()->rem(strval($user_id));
return $this->response->success($data);
}
}

View File

@ -11,18 +11,15 @@
namespace App\Controller\Api\V1; namespace App\Controller\Api\V1;
use App\Model\UsersFriend; use App\Model\UsersFriend;
use App\Service\UserService;
use Hyperf\Di\Annotation\Inject; use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware; use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware; use App\Middleware\JWTAuthMiddleware;
use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ResponseInterface;
use App\Amqp\Producer\ChatMessageProducer;
use App\Service\ContactsService; use App\Service\ContactsService;
use App\Service\SocketClientService; use App\Service\SocketClientService;
use App\Service\UserService;
use App\Constants\SocketConstants;
use App\Model\UsersFriendsApply;
use App\Model\TalkList; use App\Model\TalkList;
use App\Cache\FriendApply; use App\Cache\FriendApply;
use App\Cache\FriendRemark; use App\Cache\FriendRemark;
@ -41,13 +38,7 @@ class ContactsController extends CController
* @Inject * @Inject
* @var ContactsService * @var ContactsService
*/ */
private $contactsService; private $service;
/**
* @inject
* @var SocketClientService
*/
private $socketClientService;
/** /**
* 获取用户联系人列表 * 获取用户联系人列表
@ -55,61 +46,19 @@ class ContactsController extends CController
* *
* @return ResponseInterface * @return ResponseInterface
*/ */
public function getContacts() public function getContacts(UserService $service)
{ {
$rows = UsersFriend::getUserFriends($this->uid()); $rows = $service->getUserFriends($this->uid());
if ($rows) { if ($rows) {
$runArr = ServerRunID::getInstance()->getServerRunIdAll(); $runArr = ServerRunID::getInstance()->getServerRunIdAll();
foreach ($rows as $k => $row) { foreach ($rows as $k => $row) {
$rows[$k]['is_online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr); $rows[$k]['is_online'] = container()->get(SocketClientService::class)->isOnlineAll($row['id'], $runArr);
} }
} }
return $this->response->success($rows); return $this->response->success($rows);
} }
/**
* 添加联系人
* @RequestMapping(path="add", methods="post")
*
* @param UserService $userService
* @return ResponseInterface
*/
public function addContact(UserService $userService)
{
$params = $this->request->inputs(['friend_id', 'remarks']);
$this->validate($params, [
'friend_id' => 'required|integer',
'remarks' => 'present|max:50'
]);
$user = $userService->findById($params['friend_id']);
if (!$user) {
return $this->response->fail('用户不存在!');
}
$user_id = $this->uid();
if (!$this->contactsService->addContact($user_id, intval($params['friend_id']), $params['remarks'])) {
return $this->response->fail('添加好友申请失败!');
}
// 好友申请未读消息数自增
FriendApply::getInstance()->incr($params['friend_id'], 1);
// 判断对方是否在线。如果在线发送消息通知
if ($this->socketClientService->isOnlineAll(intval($params['friend_id']))) {
push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [
'sender' => $user_id,
'receive' => intval($params['friend_id']),
'type' => 1,
'status' => 1,
'remark' => ''
]));
}
return $this->response->success([], '发送好友申请成功...');
}
/** /**
* 删除联系人 * 删除联系人
* @RequestMapping(path="delete", methods="post") * @RequestMapping(path="delete", methods="post")
@ -124,7 +73,7 @@ class ContactsController extends CController
]); ]);
$user_id = $this->uid(); $user_id = $this->uid();
if (!$this->contactsService->deleteContact($user_id, intval($params['friend_id']))) { if (!$this->service->delete($user_id, intval($params['friend_id']))) {
return $this->response->fail('好友关系解除失败!'); return $this->response->fail('好友关系解除失败!');
} }
@ -132,91 +81,11 @@ class ContactsController extends CController
TalkList::delItem($user_id, $params['friend_id'], 2); TalkList::delItem($user_id, $params['friend_id'], 2);
TalkList::delItem($params['friend_id'], $user_id, 2); TalkList::delItem($params['friend_id'], $user_id, 2);
// ... TODO 推送消息(待完善) // TODO 推送消息(待完善)
return $this->response->success([], '好友关系解除成功...'); return $this->response->success([], '好友关系解除成功...');
} }
/**
* 同意添加联系人
* @RequestMapping(path="accept-invitation", methods="post")
*
* @return ResponseInterface
*/
public function acceptInvitation()
{
$params = $this->request->inputs(['apply_id', 'remarks']);
$this->validate($params, [
'apply_id' => 'required|integer',
'remarks' => 'present|max:20'
]);
$user_id = $this->uid();
$isTrue = $this->contactsService->acceptInvitation($user_id, intval($params['apply_id']), $params['remarks']);
if (!$isTrue) {
return $this->response->fail('处理失败!');
}
$friend_id = $info = UsersFriendsApply::where('id', $params['apply_id'])
->where('friend_id', $user_id)
->value('user_id');
// 判断对方是否在线。如果在线发送消息通知
if ($this->socketClientService->isOnlineAll($friend_id)) {
// TODO 待完善
push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [
'sender' => $user_id,
'receive' => $friend_id,
'type' => 1,
'status' => 1,
'remark' => ''
]));
}
return $this->response->success([], '处理成功...');
}
/**
* 拒绝添加联系人(预留)
* @RequestMapping(path="decline-invitation", methods="post")
*
* @return ResponseInterface
*/
public function declineInvitation()
{
$params = $this->request->inputs(['apply_id', 'remarks']);
$this->validate($params, [
'apply_id' => 'required|integer',
'remarks' => 'present|max:20'
]);
$isTrue = $this->contactsService->declineInvitation($this->uid(), intval($params['apply_id']), $params['remarks']);
return $isTrue
? $this->response->success()
: $this->response->fail();
}
/**
* 删除联系人申请记录
* @RequestMapping(path="delete-apply", methods="post")
*
* @return ResponseInterface
*/
public function deleteContactApply()
{
$params = $this->request->inputs(['apply_id']);
$this->validate($params, [
'apply_id' => 'required|integer'
]);
$isTrue = $this->contactsService->delContactApplyRecord($this->uid(), intval($params['apply_id']));
return $isTrue
? $this->response->success()
: $this->response->fail();
}
/** /**
* 获取联系人申请未读数 * 获取联系人申请未读数
* @RequestMapping(path="apply-unread-num", methods="get") * @RequestMapping(path="apply-unread-num", methods="get")
@ -230,31 +99,6 @@ class ContactsController extends CController
]); ]);
} }
/**
* 获取联系人申请未读数
* @RequestMapping(path="apply-records", methods="get")
*
* @return ResponseInterface
*/
public function getContactApplyRecords()
{
$params = $this->request->inputs(['page', 'page_size']);
$this->validate($params, [
'page' => 'present|integer',
'page_size' => 'present|integer'
]);
$page = $this->request->input('page', 1);
$page_size = $this->request->input('page_size', 10);
$user_id = $this->uid();
$data = $this->contactsService->getContactApplyRecords($user_id, $page, $page_size);
FriendApply::getInstance()->rem(strval($user_id));
return $this->response->success($data);
}
/** /**
* 搜索联系人 * 搜索联系人
* @RequestMapping(path="search", methods="get") * @RequestMapping(path="search", methods="get")
@ -268,7 +112,7 @@ class ContactsController extends CController
'mobile' => "present|regex:/^1[3456789][0-9]{9}$/" 'mobile' => "present|regex:/^1[3456789][0-9]{9}$/"
]); ]);
$result = $this->contactsService->findContact($params['mobile']); $result = $this->service->findContact($params['mobile']);
return $this->response->success($result); return $this->response->success($result);
} }
@ -287,7 +131,7 @@ class ContactsController extends CController
]); ]);
$user_id = $this->uid(); $user_id = $this->uid();
$isTrue = $this->contactsService->editContactRemark($user_id, intval($params['friend_id']), $params['remarks']); $isTrue = $this->service->editRemark($user_id, intval($params['friend_id']), $params['remarks']);
if (!$isTrue) { if (!$isTrue) {
return $this->response->fail('备注修改失败!'); return $this->response->fail('备注修改失败!');
} }

View File

@ -12,6 +12,7 @@ namespace App\Controller\Api\V1;
use App\Cache\SocketRoom; use App\Cache\SocketRoom;
use App\Constants\TalkType; use App\Constants\TalkType;
use App\Service\UserService;
use Hyperf\Di\Annotation\Inject; use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Annotation\RequestMapping;
@ -329,10 +330,10 @@ class GroupController extends CController
* *
* @return ResponseInterface * @return ResponseInterface
*/ */
public function getInviteFriends() public function getInviteFriends(UserService $service)
{ {
$group_id = $this->request->input('group_id', 0); $group_id = $this->request->input('group_id', 0);
$friends = UsersFriend::getUserFriends($this->uid()); $friends = $service->getUserFriends($this->uid());
if ($group_id > 0 && $friends) { if ($group_id > 0 && $friends) {
if ($ids = GroupMember::getGroupMemberIds($group_id)) { if ($ids = GroupMember::getGroupMemberIds($group_id)) {
foreach ($friends as $k => $item) { foreach ($friends as $k => $item) {

View File

@ -4,6 +4,8 @@ declare (strict_types=1);
namespace App\Model; namespace App\Model;
use App\Cache\FriendRemark;
/** /**
* 表情包收藏数据表模型 * 表情包收藏数据表模型
* *
@ -24,6 +26,7 @@ class UsersFriend extends BaseModel
'user_id', 'user_id',
'friend_id', 'friend_id',
'status', 'status',
'remark',
'created_at', 'created_at',
'updated_at', 'updated_at',
]; ];
@ -37,35 +40,16 @@ class UsersFriend extends BaseModel
'updated_at' => 'datetime', 'updated_at' => 'datetime',
]; ];
/**
* 获取用户所有好友
*
* @param int $user_id 用户ID
* @return array
*/
public static function getUserFriends(int $user_id)
{
return UsersFriend::leftJoin('users', 'users.id', '=', 'users_friends.friend_id')
->where('user_id', $user_id)->where('users_friends.status', 1)
->get([
'users.id',
'users.nickname',
'users.avatar',
'users.motto',
'users.gender',
'users_friends.remark as friend_remark',
])->toArray();
}
/** /**
* 判断用户之间是否存在好友关系 * 判断用户之间是否存在好友关系
* *
* @param int $user_id 用户ID * @param int $user_id 用户ID
* @param int $friend_id 好友ID * @param int $friend_id 好友ID
* @param bool $is_cache 是否允许读取缓存 * @param bool $is_cache 是否允许读取缓存
* @param bool $is_mutual 相互互为好友
* @return bool * @return bool
*/ */
public static function isFriend(int $user_id, int $friend_id, bool $is_cache = false) public static function isFriend(int $user_id, int $friend_id, bool $is_cache = false, $is_mutual = false)
{ {
$cacheKey = "good_friends:{$user_id}_{$friend_id}"; $cacheKey = "good_friends:{$user_id}_{$friend_id}";
if ($is_cache && redis()->get($cacheKey)) { if ($is_cache && redis()->get($cacheKey)) {
@ -81,13 +65,20 @@ class UsersFriend extends BaseModel
} }
/** /**
* 获取指定用户的所有朋友的用户ID * 获取好友备注
* *
* @param int $user_id 指定用户ID * @param int $user_id 用户ID
* @return array * @param int $friend_id 好友ID
* @return string
*/ */
public static function getFriendIds(int $user_id) public static function getFriendRemark(int $user_id, int $friend_id)
{ {
return UsersFriend::where('user_id', $user_id)->where('status', 1)->pluck('friend_id')->toArray(); $remark = FriendRemark::getInstance()->read($user_id, $friend_id);
if ($remark) return $remark;
$remark = UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->value('remark');
if ($remark) FriendRemark::getInstance()->save($user_id, $friend_id, $remark);
return (string)$remark;
} }
} }

View File

@ -11,7 +11,7 @@ namespace App\Model;
* @property int $user_id 用户ID * @property int $user_id 用户ID
* @property int $friend_id 朋友ID * @property int $friend_id 朋友ID
* @property int $status 申请状态 * @property int $status 申请状态
* @property string $remarks 备注说明 * @property string $remark 备注说明
* @property string $created_at 创建时间 * @property string $created_at 创建时间
* @property string $updated_at 更新时间 * @property string $updated_at 更新时间
* @package App\Model * @package App\Model

View File

@ -1,43 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Process;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
/**
* @Process(name="RedisSubscribe")
*/
class RedisSubscribe extends AbstractProcess
{
/**
* 订阅的通道
*
* @var string[]
*/
private $chans = ['websocket'];
public function handle(): void
{
redis()->subscribe($this->chans, [$this, 'subscribe']);
}
/**
* 订阅处理逻辑
*
* @param $redis
* @param string $chan
* @param string $msg
*/
public function subscribe($redis, string $chan, string $msg)
{
echo PHP_EOL . "chan : $chan , msg : $msg";
}
public function isEnable($server): bool
{
return false;
}
}

View File

@ -0,0 +1,385 @@
<?php
declare(strict_types=1);
namespace App\Process;
use App\Cache\SocketRoom;
use App\Constants\SocketConstants;
use App\Constants\TalkMsgType;
use App\Constants\TalkType;
use App\Model\Chat\TalkRecords;
use App\Model\Chat\TalkRecordsCode;
use App\Model\Chat\TalkRecordsFile;
use App\Model\Chat\TalkRecordsForward;
use App\Model\Chat\TalkRecordsInvite;
use App\Model\Group\Group;
use App\Model\User;
use App\Model\UsersFriendsApply;
use App\Service\SocketClientService;
use App\Service\UserService;
use Hyperf\Amqp\Result;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
/**
* @Process(name="RedisWebsocketSubscribe")
*/
class RedisWebsocketSubscribe extends AbstractProcess
{
/**
* 订阅的通道
*
* @var string[]
*/
private $chans = ['websocket'];
/**
* 消息事件与回调事件绑定
*
* @var array
*/
const EVENTS = [
// 聊天消息事件
SocketConstants::EVENT_TALK => 'onConsumeTalk',
// 键盘输入事件
SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard',
// 用户在线状态事件
SocketConstants::EVENT_ONLINE_STATUS => 'onConsumeOnlineStatus',
// 聊天消息推送事件
SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk',
// 好友申请相关事件
SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply'
];
/**
* @var SocketClientService
*/
private $socketClientService;
public function handle(): void
{
$this->socketClientService = container()->get(SocketClientService::class);
redis()->subscribe($this->chans, [$this, 'subscribe']);
}
/**
* 订阅处理逻辑
*
* @param $redis
* @param string $chan
* @param string $message
*/
public function subscribe($redis, string $chan, string $message)
{
//echo PHP_EOL . "chan : $chan , msg : $message";
$data = json_decode($message, true);
$this->{self::EVENTS[$data['event']]}($data);
}
public function isEnable($server): bool
{
return true;
}
/**
* 对话聊天消息
*
* @param array $data 队列消息
* @return string
*/
public function onConsumeTalk(array $data): string
{
$talk_type = $data['data']['talk_type'];
$sender_id = $data['data']['sender_id'];
$receiver_id = $data['data']['receiver_id'];
$record_id = $data['data']['record_id'];
$fds = [];
$groupInfo = null;
if ($talk_type == TalkType::PRIVATE_CHAT) {
$fds = array_merge(
$this->socketClientService->findUserFds($sender_id),
$this->socketClientService->findUserFds($receiver_id)
);
} else if ($talk_type == TalkType::GROUP_CHAT) {
foreach (SocketRoom::getInstance()->getRoomMembers(strval($receiver_id)) as $uid) {
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid)));
}
$groupInfo = Group::where('id', $receiver_id)->first(['group_name', 'avatar']);
}
// 客户端ID去重
if (!$fds = array_unique($fds)) {
return Result::ACK;
}
$result = TalkRecords::leftJoin('users', 'users.id', '=', 'talk_records.user_id')
->where('talk_records.id', $record_id)
->first([
'talk_records.id',
'talk_records.talk_type',
'talk_records.msg_type',
'talk_records.user_id',
'talk_records.receiver_id',
'talk_records.content',
'talk_records.is_revoke',
'talk_records.created_at',
'users.nickname',
'users.avatar',
]);
if (!$result) return Result::ACK;
$file = $code_block = $forward = $invite = [];
switch ($result->msg_type) {
case TalkMsgType::FILE_MESSAGE:
$file = TalkRecordsFile::where('record_id', $result->id)->first([
'id', 'record_id', 'user_id', 'file_source', 'file_type',
'save_type', 'original_name', 'file_suffix', 'file_size', 'save_dir'
]);
$file = $file ? $file->toArray() : [];
$file && $file['file_url'] = get_media_url($file['save_dir']);
break;
case TalkMsgType::FORWARD_MESSAGE:
$forward = ['num' => 0, 'list' => []];
$forwardInfo = TalkRecordsForward::where('record_id', $result->id)->first(['records_id', 'text']);
if ($forwardInfo) {
$forward = [
'num' => count(parse_ids($forwardInfo->records_id)),
'list' => json_decode($forwardInfo->text, true) ?? []
];
}
break;
case TalkMsgType::CODE_MESSAGE:
$code_block = TalkRecordsCode::where('record_id', $result->id)->first(['record_id', 'code_lang', 'code']);
$code_block = $code_block ? $code_block->toArray() : [];
break;
case TalkMsgType::GROUP_INVITE_MESSAGE:
$notifyInfo = TalkRecordsInvite::where('record_id', $result->id)->first([
'record_id', 'type', 'operate_user_id', 'user_ids'
]);
$userInfo = User::where('id', $notifyInfo->operate_user_id)->first(['nickname', 'id']);
$invite = [
'type' => $notifyInfo->type,
'operate_user' => ['id' => $userInfo->id, 'nickname' => $userInfo->nickname],
'users' => User::whereIn('id', parse_ids($notifyInfo->user_ids))->get(['id', 'nickname'])->toArray()
];
unset($notifyInfo, $userInfo);
break;
}
$notify = [
'sender_id' => $sender_id,
'receiver_id' => $receiver_id,
'talk_type' => $talk_type,
'data' => $this->formatTalkMessage([
'id' => $result->id,
'talk_type' => $result->talk_type,
'msg_type' => $result->msg_type,
"user_id" => $result->user_id,
"receiver_id" => $result->receiver_id,
'avatar' => $result->avatar,
'nickname' => $result->nickname,
'group_name' => $groupInfo ? $groupInfo->group_name : '',
'group_avatar' => $groupInfo ? $groupInfo->avatar : '',
"created_at" => $result->created_at,
"content" => $result->content,
"file" => $file,
"code_block" => $code_block,
'forward' => $forward,
'invite' => $invite
])
];
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_TALK, $notify]));
return Result::ACK;
}
/**
* 键盘输入事件消息
*
* @param array $data 队列消息
* @return string
*/
public function onConsumeKeyboard(array $data): string
{
$fds = $this->socketClientService->findUserFds($data['data']['receiver_id']);
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_KEYBOARD, $data['data']]));
return Result::ACK;
}
/**
* 用户上线或下线消息
*
* @param array $data 队列消息
* @return string
*/
public function onConsumeOnlineStatus(array $data): string
{
$user_id = (int)$data['data']['user_id'];
$status = (int)$data['data']['status'];
$fds = [];
$ids = container()->get(UserService::class)->getFriendIds($user_id);
foreach ($ids as $friend_id) {
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id)));
}
$this->socketPushNotify(array_unique($fds), json_encode([
SocketConstants::EVENT_ONLINE_STATUS, [
'user_id' => $user_id,
'status' => $status
]
]));
return Result::ACK;
}
/**
* 撤销聊天消息
*
* @param array $data 队列消息
* @return string
*/
public function onConsumeRevokeTalk(array $data): string
{
$record = TalkRecords::where('id', $data['data']['record_id'])->first(['id', 'talk_type', 'user_id', 'receiver_id']);
$fds = [];
if ($record->talk_type == TalkType::PRIVATE_CHAT) {
$fds = array_merge($fds, $this->socketClientService->findUserFds($record->user_id));
$fds = array_merge($fds, $this->socketClientService->findUserFds($record->receiver_id));
} else if ($record->talk_type == TalkType::GROUP_CHAT) {
$userIds = SocketRoom::getInstance()->getRoomMembers(strval($record->receiver_id));
foreach ($userIds as $uid) {
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid));
}
}
$fds = array_unique($fds);
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_REVOKE_TALK, [
'talk_type' => $record->talk_type,
'sender_id' => $record->user_id,
'receiver_id' => $record->receiver_id,
'record_id' => $record->id,
]]));
return Result::ACK;
}
/**
* 好友申请消息
*
* @param array $data 队列消息
* @return string
*/
public function onConsumeFriendApply(array $data): string
{
$data = $data['data'];
$applyInfo = UsersFriendsApply::where('id', $data['apply_id'])->first();
if (!$applyInfo) return Result::ACK;
$fds = $this->socketClientService->findUserFds($data['type'] == 1 ? $applyInfo->friend_id : $applyInfo->user_id);
if ($data['type'] == 1) {
$msg = [
'sender_id' => $applyInfo->user_id,
'receiver_id' => $applyInfo->friend_id,
'remark' => $applyInfo->remark,
];
} else {
$msg = [
'sender_id' => $applyInfo->friend_id,
'receiver_id' => $applyInfo->user_id,
'status' => $applyInfo->status,
'remark' => $applyInfo->remark,
];
}
$friendInfo = User::select(['id', 'avatar', 'nickname', 'mobile', 'motto'])->find($data['type'] == 1 ? $applyInfo->user_id : $applyInfo->friend_id);
$msg['friend'] = [
'user_id' => $friendInfo->id,
'avatar' => $friendInfo->avatar,
'nickname' => $friendInfo->nickname,
'mobile' => $friendInfo->mobile,
];
$this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_FRIEND_APPLY, $msg]));
return Result::ACK;
}
/**
* WebSocket 消息推送
*
* @param $fds
* @param $message
*/
private function socketPushNotify($fds, $message)
{
$server = server();
foreach ($fds as $fd) {
$server->exist(intval($fd)) && $server->push(intval($fd), $message);
}
}
/**
* 格式化对话的消息体
*
* @param array $data 对话的消息
* @return array
*/
private function formatTalkMessage(array $data): array
{
$message = [
"id" => 0, // 消息记录ID
"talk_type" => 1, // 消息来源[1:好友私信;2:群聊]
"msg_type" => 1, // 消息类型
"user_id" => 0, // 发送者用户ID
"receiver_id" => 0, // 接收者ID[好友ID或群ID]
// 发送消息人的信息
"nickname" => "",// 用户昵称
"avatar" => "",// 用户头像
"group_name" => "",// 群组名称
"group_avatar" => "",// 群组头像
// 不同的消息类型
"file" => [],
"code_block" => [],
"forward" => [],
"invite" => [],
// 消息创建时间
"content" => '',// 文本消息
"created_at" => "",
// 消息属性
"is_revoke" => 0, // 消息是否撤销
];
return array_merge($message, array_intersect_key($data, $message));
}
}

View File

@ -2,54 +2,144 @@
namespace App\Service; namespace App\Service;
use App\Model\User;
use App\Model\UsersFriend;
use App\Model\UsersFriendsApply; use App\Model\UsersFriendsApply;
use App\Traits\PagingTrait;
use Hyperf\DbConnection\Db;
class ContactApplyService class ContactApplyService
{ {
use PagingTrait;
/** /**
* 创建好友申请 * 创建好友申请
* *
* @param int $user_id 申请人用户ID * @param int $user_id 用户ID
* @param int $friend_id 朋友ID * @param int $friend_id 朋友ID
* @param string $remark 申请备注 * @param string $remark 申请备注
* @return bool * @return array
*/ */
public function create(int $user_id, int $friend_id, string $remark) public function create(int $user_id, int $friend_id, string $remark)
{ {
// 查询最后一次联系人申请
$result = UsersFriendsApply::where('user_id', $user_id)->where('friend_id', $friend_id)->orderBy('id', 'desc')->first();
if ($result && $result->status == 0) {
$result->remarks = $remark;
$result->updated_at = date('Y-m-d H:i:s');
$result->save();
return true;
}
$result = UsersFriendsApply::create([ $result = UsersFriendsApply::create([
'user_id' => $user_id, 'user_id' => $user_id,
'friend_id' => $friend_id, 'friend_id' => $friend_id,
'status' => 0, 'status' => 0,
'remarks' => $remark, 'remark' => $remark,
'created_at' => date('Y-m-d H:i:s'), 'created_at' => date('Y-m-d H:i:s'),
'updated_at' => date('Y-m-d H:i:s') 'updated_at' => date('Y-m-d H:i:s'),
]); ]);
return (bool)$result; if (!$result) {
return [false, $result];
}
return [true, $result];
} }
/** /**
* 同意好友申请 * 同意好友申请
*
* @param int $user_id 用户ID
* @param int $apply_id 申请记录ID
*/ */
public function accept() public function accept(int $user_id, int $apply_id, string $remarks = '')
{ {
$info = UsersFriendsApply::where('id', $apply_id)->first();
if (!$info || $info->friend_id != $user_id) {
return false;
}
Db::beginTransaction();
try {
$info->status = 1;
$info->save();
UsersFriend::updateOrCreate([
'user_id' => $info->user_id,
'friend_id' => $info->friend_id,
], [
'status' => 1,
'remark' => $remarks,
]);
UsersFriend::updateOrCreate([
'user_id' => $info->friend_id,
'friend_id' => $info->user_id,
], [
'status' => 1,
'remark' => User::where('id', $info->user_id)->value('nickname'),
]);
Db::commit();
} catch (\Exception $e) {
Db::rollBack();
echo $e->getMessage();
return false;
}
return true;
} }
/** /**
* 拒绝好友申请 * 拒绝好友申请
*
* @param int $user_id 用户ID
* @param int $apply_id 申请记录ID
*/ */
public function decline() public function decline(int $user_id, int $apply_id, string $reason = '')
{ {
return (bool)UsersFriendsApply::where('id', $apply_id)->where('friend_id', $user_id)->update([
'status' => 2,
'reason' => $reason,
'updated_at' => date('Y-m-d H:i:s'),
]);
}
/**
* 删除好友申请
*
* @param int $user_id
* @param int $apply_id
* @return bool
*/
public function delete(int $user_id, int $apply_id)
{
return (bool)UsersFriendsApply::where('id', $apply_id)->where('friend_id', $user_id)->delete();
}
/**
* 获取联系人申请记录
*
* @param int $user_id 用户ID
* @param int $page 当前分页
* @param int $page_size 分页大小
* @return array
*/
public function getApplyRecords(int $user_id, $page = 1, $page_size = 30): array
{
$rowsSqlObj = UsersFriendsApply::select([
'users_friends_apply.id',
'users_friends_apply.status',
'users_friends_apply.remark',
'users.nickname',
'users.avatar',
'users.mobile',
'users_friends_apply.user_id',
'users_friends_apply.friend_id',
'users_friends_apply.created_at'
]);
$rowsSqlObj->leftJoin('users', 'users.id', '=', 'users_friends_apply.user_id');
$rowsSqlObj->where('users_friends_apply.friend_id', $user_id);
$count = $rowsSqlObj->count();
$rows = [];
if ($count > 0) {
$rows = $rowsSqlObj->orderBy('users_friends_apply.id', 'desc')->forPage($page, $page_size)->get()->toArray();
}
return $this->getPagingRows($rows, $count, $page, $page_size);
} }
} }

View File

@ -12,10 +12,7 @@ namespace App\Service;
use App\Model\User; use App\Model\User;
use App\Model\UsersFriend; use App\Model\UsersFriend;
use App\Model\UsersFriendsApply;
use App\Traits\PagingTrait; use App\Traits\PagingTrait;
use Hyperf\DbConnection\Db;
use Exception;
/** /**
* ContactsService * ContactsService
@ -27,43 +24,6 @@ class ContactsService extends BaseService
{ {
use PagingTrait; use PagingTrait;
/**
* 添加联系人/申请
*
* @param int $user_id 用户ID
* @param int $friend_id 好友ID
* @param string $remarks 申请备注
* @return bool
*/
public function addContact(int $user_id, int $friend_id, string $remarks): bool
{
// 判断是否是好友关系
if (UsersFriend::isFriend($user_id, $friend_id)) return true;
// 查询最后一次联系人申请
$result = UsersFriendsApply::where('user_id', $user_id)
->where('friend_id', $friend_id)
->orderBy('id', 'desc')->first();
if ($result && $result->status == 0) {
$result->remarks = $remarks;
$result->updated_at = date('Y-m-d H:i:s');
$result->save();
return true;
} else {
$result = UsersFriendsApply::create([
'user_id' => $user_id,
'friend_id' => $friend_id,
'status' => 0,
'remarks' => $remarks,
'created_at' => date('Y-m-d H:i:s'),
'updated_at' => date('Y-m-d H:i:s')
]);
return (bool)$result;
}
}
/** /**
* 删除联系人 * 删除联系人
* *
@ -71,102 +31,16 @@ class ContactsService extends BaseService
* @param int $friend_id 好友ID * @param int $friend_id 好友ID
* @return bool * @return bool
*/ */
public function deleteContact(int $user_id, int $friend_id): bool public function delete(int $user_id, int $friend_id): bool
{ {
if (!UsersFriend::isFriend($user_id, $friend_id)) { $res = (bool)UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->where('status', 1)->update([
return false; 'status' => 0,
}
$data = ['status' => 0];
// 用户ID比大小交换位置
if ($user_id > $friend_id) {
[$user_id, $friend_id] = [$friend_id, $user_id];
}
return (bool)UsersFriend::where('user1', $user_id)->where('user2', $friend_id)->update($data);
}
/**
* 同意添加联系人 / 联系人申请
*
* @param int $user_id 用户ID
* @param int $apply_id 联系人申请ID
* @param string $remarks 联系人备注名称
* @return bool
*/
public function acceptInvitation(int $user_id, int $apply_id, string $remarks = ''): bool
{
$info = UsersFriendsApply::where('id', $apply_id)
->where('friend_id', $user_id)
->where('status', 0)
->orderBy('id', 'desc')
->first(['user_id', 'friend_id']);
if (!$info) return false;
Db::beginTransaction();
try {
$res = UsersFriendsApply::where('id', $apply_id)->update(['status' => 1, 'updated_at' => date('Y-m-d H:i:s')]);
if (!$res) {
throw new Exception('更新好友申请表信息失败');
}
// 判断大小 交换 user1,user2 的位置
[$user1, $user2] = $info->user_id < $info->friend_id ? [$info->user_id, $info->friend_id] : [$info->friend_id, $info->user_id];
// 查询是否存在好友记录
$friendResult = UsersFriend::where([
['user1', '=', $user1],
['user2', '=', $user2]
])->first(['id', 'user1', 'user2', 'active', 'status']);
if ($friendResult) {
$active = ($friendResult->user1 == $info->user_id && $friendResult->user2 == $info->friend_id) ? 1 : 2;
UsersFriend::where('id', $friendResult->id)->update(['active' => $active, 'status' => 1]);
} else {
$friend_nickname = User::where('id', $info->friend_id)->value('nickname');
UsersFriend::create([
'user1' => $user1,
'user2' => $user2,
'user1_remark' => $user1 == $user_id ? $remarks : $friend_nickname,
'user2_remark' => $user2 == $user_id ? $remarks : $friend_nickname,
'active' => $user1 == $user_id ? 2 : 1,
'status' => 1,
'agree_time' => date('Y-m-d H:i:s'),
'created_at' => date('Y-m-d H:i:s')
]);
}
Db::commit();
} catch (Exception $e) {
Db::rollBack();
return false;
}
return true;
}
/**
* 拒绝添加联系人 / 联系人申请
*
* @param int $user_id 用户ID
* @param int $apply_id 联系人申请ID
* @param string $remarks 拒绝申请备注信息
* @return bool
*/
public function declineInvitation(int $user_id, int $apply_id, string $remarks = ''): bool
{
return (bool)UsersFriendsApply::where([
['id', '=', $apply_id],
['user_id', '=', $user_id],
['status', '=', 2],
])->update([
'status' => 2,
'remarks' => $remarks,
'updated_at' => date('Y-m-d H:i:s') 'updated_at' => date('Y-m-d H:i:s')
]); ]);
if ($res) redis()->del("good_friends:{$user_id}_{$friend_id}");
return $res;
} }
/** /**
@ -177,9 +51,12 @@ class ContactsService extends BaseService
* @param string $remark 好友备注名称 * @param string $remark 好友备注名称
* @return bool * @return bool
*/ */
public function editContactRemark(int $user_id, int $friend_id, string $remark): bool public function editRemark(int $user_id, int $friend_id, string $remark): bool
{ {
return (bool)UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->update(['remark' => $remark]); return (bool)UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->update([
'remark' => $remark,
'updated_at' => date('Y-m-d H:i:s')
]);
} }
/** /**
@ -194,51 +71,4 @@ class ContactsService extends BaseService
return $user ? $user->toArray() : []; return $user ? $user->toArray() : [];
} }
/**
* 获取联系人申请记录
*
* @param int $user_id 用户ID
* @param int $page 当前分页
* @param int $page_size 分页大小
* @return array
*/
public function getContactApplyRecords(int $user_id, $page = 1, $page_size = 30): array
{
$rowsSqlObj = UsersFriendsApply::select([
'users_friends_apply.id',
'users_friends_apply.status',
'users_friends_apply.remarks',
'users.nickname',
'users.avatar',
'users.mobile',
'users_friends_apply.user_id',
'users_friends_apply.friend_id',
'users_friends_apply.created_at'
]);
$rowsSqlObj->leftJoin('users', 'users.id', '=', 'users_friends_apply.user_id');
$rowsSqlObj->where('users_friends_apply.friend_id', $user_id);
$count = $rowsSqlObj->count();
$rows = [];
if ($count > 0) {
$rows = $rowsSqlObj->orderBy('users_friends_apply.id', 'desc')->forPage($page, $page_size)->get()->toArray();
}
return $this->getPagingRows($rows, $count, $page, $page_size);
}
/**
* 删除联系人申请记录
*
* @param int $user_id 用户ID
* @param int $apply_id 联系人好友申请ID
* @return bool
* @throws Exception
*/
public function delContactApplyRecord(int $user_id, int $apply_id): bool
{
return (bool)UsersFriendsApply::where('id', $apply_id)->where('friend_id', $user_id)->delete();
}
} }

View File

@ -26,24 +26,6 @@ class TalkService extends BaseService
{ {
use PagingTrait; use PagingTrait;
/**
* 获取好友备注
*
* @param int $user_id 用户ID
* @param int $friend_id 好友ID
* @return string
*/
public function getFriendRemark(int $user_id, int $friend_id)
{
$remark = FriendRemark::getInstance()->read($user_id, $friend_id);
if ($remark) return $remark;
$remark = UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->value('remark');
if ($remark) FriendRemark::getInstance()->save($user_id, $friend_id, $remark);
return (string)$remark;
}
/** /**
* 获取用户的聊天列表 * 获取用户的聊天列表
* *
@ -95,7 +77,7 @@ class TalkService extends BaseService
$data['avatar'] = $item['user_avatar']; $data['avatar'] = $item['user_avatar'];
$data['unread_num'] = UnreadTalk::getInstance()->read($item['receiver_id'], $user_id); $data['unread_num'] = UnreadTalk::getInstance()->read($item['receiver_id'], $user_id);
$data['is_online'] = $socketFDService->isOnlineAll($item['receiver_id'], $runIdAll); $data['is_online'] = $socketFDService->isOnlineAll($item['receiver_id'], $runIdAll);
$data['remark_name'] = $this->getFriendRemark($user_id, (int)$item['receiver_id']); $data['remark_name'] = UsersFriend::getFriendRemark($user_id, (int)$item['receiver_id']);
} else { } else {
$data['name'] = strval($item['group_name']); $data['name'] = strval($item['group_name']);
$data['avatar'] = $item['group_avatar']; $data['avatar'] = $item['group_avatar'];

View File

@ -118,21 +118,17 @@ class UserService extends BaseService
if (!$info) return []; if (!$info) return [];
$info = $info->toArray(); $info = $info->toArray();
$info['friend_status'] = 0;//朋友关系状态 0:本人 1:陌生人 2:朋友 $info['friend_status'] = 0;//朋友关系状态[0:本人;1:陌生人;2:朋友;]
$info['nickname_remark'] = ''; $info['nickname_remark'] = '';
$info['friend_apply'] = 0; $info['friend_apply'] = 0;
// 判断查询信息是否是自己 // 判断查询信息是否是自己
if ($friend_id != $me_user_id) { if ($friend_id != $me_user_id) {
$friendInfo = UsersFriend:: $is_friend = UsersFriend::isFriend($me_user_id, $friend_id, true, true);
where('user1', '=', $friend_id > $me_user_id ? $me_user_id : $friend_id)
->where('user2', '=', $friend_id < $me_user_id ? $me_user_id : $friend_id)
->where('status', 1)
->first(['id', 'user1', 'user2', 'active', 'user1_remark', 'user2_remark']);
$info['friend_status'] = $friendInfo ? 2 : 1; $info['friend_status'] = $is_friend ? 2 : 1;
if ($friendInfo) { if ($is_friend) {
$info['nickname_remark'] = $friendInfo->user1 == $friend_id ? $friendInfo->user2_remark : $friendInfo->user1_remark; $info['nickname_remark'] = UsersFriend::getFriendRemark($me_user_id, $friend_id);
} else { } else {
$res = UsersFriendsApply::where('user_id', $me_user_id) $res = UsersFriendsApply::where('user_id', $me_user_id)
->where('friend_id', $friend_id) ->where('friend_id', $friend_id)
@ -146,4 +142,35 @@ class UserService extends BaseService
return $info; return $info;
} }
/**
* 获取用户好友列表
*
* @param int $user_id 用户ID
* @return array
*/
public function getUserFriends(int $user_id)
{
return UsersFriend::leftJoin('users', 'users.id', '=', 'users_friends.friend_id')
->where('user_id', $user_id)->where('users_friends.status', 1)
->get([
'users.id',
'users.nickname',
'users.avatar',
'users.motto',
'users.gender',
'users_friends.remark as friend_remark',
])->toArray();
}
/**
* 获取指定用户的所有朋友的用户ID
*
* @param int $user_id 指定用户ID
* @return array
*/
public function getFriendIds(int $user_id)
{
return UsersFriend::where('user_id', $user_id)->where('status', 1)->pluck('friend_id')->toArray();
}
} }

View File

@ -251,9 +251,21 @@ function parse_ids($ids)
*/ */
function push_amqp(ProducerMessage $message, bool $confirm = false, int $timeout = 5) function push_amqp(ProducerMessage $message, bool $confirm = false, int $timeout = 5)
{ {
push_redis_subscribe('websocket', $message->getPayload());
return container()->get(Producer::class)->produce($message, $confirm, $timeout); return container()->get(Producer::class)->produce($message, $confirm, $timeout);
} }
/**
* 推送消息到 Redis 订阅中
*
* @param string $chan
* @param string|array $message
*/
function push_redis_subscribe(string $chan, $message)
{
redis()->publish($chan, is_string($message) ? $message : json_encode($message));
}
/** /**
* 生成随机文件名 * 生成随机文件名
* *