初始化

main
gzydong 2020-12-01 17:47:25 +08:00
parent 7ea3d1be4e
commit 1905a605fa
8 changed files with 120 additions and 66 deletions

View File

@ -20,6 +20,7 @@ use App\Model\Chat\ChatRecordsInvite;
use App\Model\Chat\ChatRecordsForward;
use App\Service\SocketClientService;
use App\Service\SocketRoomService;
use App\Constants\SocketConstants;
/**
* 消息推送消费者队列
@ -33,7 +34,7 @@ class ChatMessageConsumer extends ConsumerMessage
*
* @var string
*/
public $exchange = 'im.message.fanout';
public $exchange = SocketConstants::CONSUMER_MESSAGE_EXCHANGE;
/**
* 交换机类型
@ -60,23 +61,25 @@ class ChatMessageConsumer extends ConsumerMessage
private $socketRoomService;
/**
* 推送的消息类型推送绑定事件方法
* 消息事件与回调事件绑定
*
* @var array
*/
const EVENTS = [
// 聊天消息事件
'event_talk' => 'onConsumeTalk',
SocketConstants::EVENT_TALK => 'onConsumeTalk',
// 键盘输入事件
'event_keyboard' => 'onConsumeKeyboard',
SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard',
// 用户在线状态事件
'event_online_status' => 'onConsumeOnlineStatus',
SocketConstants::EVENT_ONLINE_STATUS => 'onConsumeOnlineStatus',
// 聊天消息推送事件
'event_revoke_talk' => 'onConsumeRevokeTalk',
SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk',
// 好友申请相关事件
'event_friend_apply' => 'onConsumeFriendApply'
SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply'
];
/**
@ -88,6 +91,8 @@ class ChatMessageConsumer extends ConsumerMessage
{
$this->socketClientService = $socketClientService;
$this->socketRoomService = $socketRoomService;
// 动态设置 Rabbit MQ 消费队列名称
$this->setQueue('queue:im_message:' . SERVER_RUN_ID);
}
@ -113,6 +118,15 @@ class ChatMessageConsumer extends ConsumerMessage
public function consumeMessage($data, AMQPMessage $message): string
{
if (isset($data['event'])) {
$redis = container()->get(Redis::class);
//[加锁]防止消息重复消费
$lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']);
if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) {
return Result::ACK;
}
// 调用对应事件绑定的回调方法
return $this->{self::EVENTS[$data['event']]}($data, $message);
}
@ -128,14 +142,6 @@ class ChatMessageConsumer extends ConsumerMessage
*/
public function onConsumeTalk(array $data, AMQPMessage $message): string
{
$redis = container()->get(Redis::class);
//[加锁]防止消息重复消费
$lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']);
if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) {
return Result::ACK;
}
$source = $data['data']['source'];
$fds = $this->socketClientService->findUserFds($data['data']['sender']);
@ -144,7 +150,7 @@ class ChatMessageConsumer extends ConsumerMessage
} else if ($source == 2) {//群聊
$userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive']));
foreach ($userIds as $uid) {
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid)));
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid));
}
}
@ -241,7 +247,7 @@ class ChatMessageConsumer extends ConsumerMessage
unset($result, $file, $code_block, $forward, $invite);
$this->socketPushNotify($fds, json_encode(['event_talk', $notify]));
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_TALK, $notify]));
return Result::ACK;
}
@ -257,7 +263,7 @@ class ChatMessageConsumer extends ConsumerMessage
{
$fds = $this->socketClientService->findUserFds($data['data']['receive_user']);
$this->socketPushNotify($fds, json_encode(['event_keyboard', $data['data']]));
$this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_KEYBOARD, $data['data']]));
return Result::ACK;
}
@ -278,7 +284,7 @@ class ChatMessageConsumer extends ConsumerMessage
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$friend_id));
}
$this->socketPushNotify(array_unique($fds), json_encode(['event_online_status', $data['data']]));
$this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_ONLINE_STATUS, $data['data']]));
return Result::ACK;
}
@ -308,7 +314,7 @@ class ChatMessageConsumer extends ConsumerMessage
$this->socketPushNotify(
array_unique($fds),
json_encode(['event_revoke_talk', [
json_encode([SocketConstants::EVENT_REVOKE_TALK, [
'record_id' => $record->id,
'source' => $record->source,
'user_id' => $record->user_id,
@ -330,7 +336,7 @@ class ChatMessageConsumer extends ConsumerMessage
{
$fds = $this->socketClientService->findUserFds($data['data']['receive']);
$this->socketPushNotify(array_unique($fds), json_encode(['event_friend_apply', $data['data']]));
$this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_FRIEND_APPLY, $data['data']]));
return Result::ACK;
}

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace App\Amqp\Producer;
use App\Constants\SocketConstants;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Utils\Str;
@ -15,14 +16,22 @@ use Hyperf\Utils\Str;
*/
class ChatMessageProducer extends ProducerMessage
{
// 交换机类型
/**
* 交换机类型
*
* @var string
*/
public $type = Type::FANOUT;
// 交换机名称
public $exchange = 'im.message.fanout';
/**
* 交换机名称
*
* @var string
*/
public $exchange = SocketConstants::CONSUMER_MESSAGE_EXCHANGE;
/**
* 初始化处理...
* 实例化处理
*
* @param string $event 事件名
* @param array $data 数据

View File

@ -36,7 +36,10 @@ class RemoveWsCacheCommand extends HyperfCommand
{
$socket = new SocketClientService();
$this->line('此过程可能耗时较长,请耐心等待!', 'info');
// 获取所有已停止运行的服务ID
$arr = $socket->getServerRunIdAll(2);
foreach ($arr as $run_id => $value) {
go(function () use ($socket, $run_id) {
$socket->removeRedisCache($run_id);

View File

@ -0,0 +1,44 @@
<?php
namespace App\Constants;
use Hyperf\Constants\AbstractConstants;
use Hyperf\Constants\Annotation\Constants;
/**
* WebSocket 消息事件
*
* @Constants
*/
class SocketConstants extends AbstractConstants
{
/**
* @Message("对话消息通知 - 事件名)
*/
const EVENT_TALK = 'event_talk';
/**
* @Message("键盘输入事件通知 - 事件名")
*/
const EVENT_KEYBOARD = 'event_keyboard';
/**
* @Message("用户在线状态通知 - 事件名")
*/
const EVENT_ONLINE_STATUS = 'event_online_status';
/**
* @Message("聊天消息撤销通知 - 事件名")
*/
const EVENT_REVOKE_TALK = 'event_revoke_talk';
/**
* @Message("好友申请消息通知 - 事件名")
*/
const EVENT_FRIEND_APPLY = 'event_friend_apply';
/**
* @Message("WebSocket 消息消费队列交换机名称")
*/
const CONSUMER_MESSAGE_EXCHANGE = 'im.message.fanout';
}

View File

@ -66,32 +66,29 @@ class ArticleController extends CController
*/
public function getArticleList()
{
$this->validate($this->request->all(), [
$params1 = $this->request->inputs(['keyword', 'find_type', 'cid', 'page']);
$this->validate($params1, [
// 搜索关键词
'keyword' => "present",
// 查询类型 $findType 1:获取近期日记 2:获取星标日记 3:获取指定分类文章 4:获取指定标签文章 5:获取已删除文章 6:关键词搜索
'find_type' => 'required|in:0,1,2,3,4,5,6',
// 分类ID
'cid' => 'present|integer|min:-1',
'page' => 'present|integer|min:1'
]);
// 查询类型 $findType 1:获取近期日记 2:获取星标日记 3:获取指定分类文章 4:获取指定标签文章 5:获取已删除文章 6:关键词搜索
$findType = $this->request->input('find_type', 0);
$keyword = $this->request->input('keyword', '');// 搜索关键词
$cid = $this->request->input('cid', -1);// 分类ID
$page = $this->request->input('page', 1);
$user_id = $this->uid();
$params = [];
$params['find_type'] = $findType;
if (in_array($findType, [3, 4])) {
$params['class_id'] = $cid;
$params['find_type'] = $params1['find_type'];
if (in_array($params1['find_type'], [3, 4])) {
$params['class_id'] = $params1['cid'];
}
if (!empty($keyword)) {
$params['keyword'] = $keyword;
if (!empty($params1['keyword'])) {
$params['keyword'] = $params1['keyword'];
}
return $this->response->success(
$this->articleService->getUserArticleList($user_id, intval($page), 10000, $params)
$this->articleService->getUserArticleList($this->uid(), 1, 10000, $params)
);
}
@ -102,15 +99,13 @@ class ArticleController extends CController
*/
public function getArticleDetail()
{
$this->validate($this->request->inputs(['article_id']), [
$params = $this->request->inputs(['article_id']);
$this->validate($params, [
'article_id' => 'required|integer'
]);
return $this->response->success(
$this->articleService->getArticleDetail(
(int)$this->request->input('article_id'),
$this->uid()
)
$this->articleService->getArticleDetail((int)$params['article_id'], $this->uid())
);
}
@ -170,11 +165,11 @@ class ArticleController extends CController
$lockKey = "article_class_sort:{$params['class_id']}_{$params['sort_type']}";
// 获取Redis锁
if (RedisLock::lock($lockKey, 0, 3)) {
if (RedisLock::lock($lockKey, 1, 3)) {
$isTrue = $this->articleService->articleClassSort($this->uid(), (int)$params['class_id'], (int)$params['sort_type']);
// 释放Redis锁
RedisLock::release($lockKey, 0);
RedisLock::release($lockKey, 1);
} else {
$isTrue = false;
}
@ -217,11 +212,7 @@ class ArticleController extends CController
'tag_name' => 'required'
]);
$id = $this->articleService->editArticleTag(
$this->uid(),
(int)$this->request->post('tag_id', 0),
$this->request->post('tag_name', '')
);
$id = $this->articleService->editArticleTag($this->uid(), (int)$params['tag_id'], $params['tag_name']);
return $id
? $this->response->success(['id' => $id])

View File

@ -101,9 +101,9 @@ class AuthController extends CController
{
$params = $this->request->all();
$this->validate($params, [
'nickname' => "required",
'nickname' => "required|max:20",
'mobile' => "required|regex:/^1[345789][0-9]{9}$/",
'password' => 'required',
'password' => 'required|max:16',
'sms_code' => 'required|digits:6',
'platform' => 'required|in:h5,ios,windows,mac,web',
]);
@ -135,10 +135,10 @@ class AuthController extends CController
*/
public function forget()
{
$params = $this->request->all();
$params = $this->request->inputs(['mobile', 'password', 'sms_code']);
$this->validate($params, [
'mobile' => "required|regex:/^1[345789][0-9]{9}$/",
'password' => 'required',
'password' => 'required|max:16',
'sms_code' => 'required|digits:6',
]);
@ -182,7 +182,7 @@ class AuthController extends CController
*/
public function sendVerifyCode()
{
$params = $this->request->all();
$params = $this->request->inputs(['type', 'mobile']);
$this->validate($params, [
'type' => "required",
'mobile' => "required|regex:/^1[345789][0-9]{9}$/"

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use App\Constants\SocketConstants;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
@ -57,8 +58,8 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
// 消息事件绑定
const EVENTS = [
'event_talk' => 'onTalk',
'event_keyboard' => 'onKeyboard',
SocketConstants::EVENT_TALK => 'onTalk',
SocketConstants::EVENT_KEYBOARD => 'onKeyboard',
];
/**
@ -88,7 +89,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
if (!$isOnline) {
// 推送消息至队列
$this->producer->produce(
new ChatMessageProducer('event_online_status', [
new ChatMessageProducer(SocketConstants::EVENT_ONLINE_STATUS, [
'user_id' => $userInfo['user_id'],
'status' => 1,
'notify' => '好友上线通知...'
@ -137,7 +138,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
if (!$isOnline) {
// 推送消息至队列
$this->producer->produce(
new ChatMessageProducer('event_online_status', [
new ChatMessageProducer(SocketConstants::EVENT_ONLINE_STATUS, [
'user_id' => $user_id,
'status' => 0,
'notify' => '好友离线通知通知...'

View File

@ -34,9 +34,7 @@ class UserService extends BaseService
*/
public function login(string $mobile, string $password)
{
$user = User::where('mobile', $mobile)->first();
if (!$user) {
if (!$user = User::where('mobile', $mobile)->first()) {
return false;
}
@ -55,13 +53,13 @@ class UserService extends BaseService
*/
public function register(array $data)
{
Db::beginTransaction();
try {
$data['password'] = create_password($data['password']);
$data['created_at'] = date('Y-m-d H:i:s');
$result = User::create($data);
// 创建用户的默认笔记分类
ArticleClass::create([
'user_id' => $result->id,
@ -70,7 +68,10 @@ class UserService extends BaseService
'sort' => 1,
'created_at' => time()
]);
Db::commit();
} catch (\Exception $e) {
Db::rollBack();
$result = false;
}
@ -98,8 +99,7 @@ class UserService extends BaseService
*/
public function changeMobile(int $user_id, string $mobile)
{
$uid = User::where('mobile', $mobile)->value('id');
if ($uid) {
if (User::where('mobile', $mobile)->value('id')) {
return [false, '手机号已被他人绑定'];
}