From 1905a605fabf29752dc9be3868388701329ef4d4 Mon Sep 17 00:00:00 2001 From: gzydong <837215079@qq.com> Date: Tue, 1 Dec 2020 17:47:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/Amqp/Consumer/ChatMessageConsumer.php | 48 ++++++++++++--------- app/Amqp/Producer/ChatMessageProducer.php | 17 ++++++-- app/Command/RemoveWsCacheCommand.php | 3 ++ app/Constants/SocketConstants.php | 44 +++++++++++++++++++ app/Controller/Api/V1/ArticleController.php | 43 ++++++++---------- app/Controller/Api/V1/AuthController.php | 10 ++--- app/Controller/WebSocketController.php | 9 ++-- app/Service/UserService.php | 12 +++--- 8 files changed, 120 insertions(+), 66 deletions(-) create mode 100644 app/Constants/SocketConstants.php diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index 28a3e73..8e7e3db 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -20,6 +20,7 @@ use App\Model\Chat\ChatRecordsInvite; use App\Model\Chat\ChatRecordsForward; use App\Service\SocketClientService; use App\Service\SocketRoomService; +use App\Constants\SocketConstants; /** * 消息推送消费者队列 @@ -33,7 +34,7 @@ class ChatMessageConsumer extends ConsumerMessage * * @var string */ - public $exchange = 'im.message.fanout'; + public $exchange = SocketConstants::CONSUMER_MESSAGE_EXCHANGE; /** * 交换机类型 @@ -60,23 +61,25 @@ class ChatMessageConsumer extends ConsumerMessage private $socketRoomService; /** - * 推送的消息类型推送绑定事件方法 + * 消息事件与回调事件绑定 + * + * @var array */ const EVENTS = [ // 聊天消息事件 - 'event_talk' => 'onConsumeTalk', + SocketConstants::EVENT_TALK => 'onConsumeTalk', // 键盘输入事件 - 'event_keyboard' => 'onConsumeKeyboard', + SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard', // 用户在线状态事件 - 'event_online_status' => 'onConsumeOnlineStatus', + SocketConstants::EVENT_ONLINE_STATUS => 'onConsumeOnlineStatus', // 聊天消息推送事件 - 'event_revoke_talk' => 'onConsumeRevokeTalk', + SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk', // 好友申请相关事件 - 'event_friend_apply' => 'onConsumeFriendApply' + SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply' ]; /** @@ -88,6 +91,8 @@ class ChatMessageConsumer extends ConsumerMessage { $this->socketClientService = $socketClientService; $this->socketRoomService = $socketRoomService; + + // 动态设置 Rabbit MQ 消费队列名称 $this->setQueue('queue:im_message:' . SERVER_RUN_ID); } @@ -113,6 +118,15 @@ class ChatMessageConsumer extends ConsumerMessage public function consumeMessage($data, AMQPMessage $message): string { if (isset($data['event'])) { + $redis = container()->get(Redis::class); + + //[加锁]防止消息重复消费 + $lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']); + if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) { + return Result::ACK; + } + + // 调用对应事件绑定的回调方法 return $this->{self::EVENTS[$data['event']]}($data, $message); } @@ -128,14 +142,6 @@ class ChatMessageConsumer extends ConsumerMessage */ public function onConsumeTalk(array $data, AMQPMessage $message): string { - $redis = container()->get(Redis::class); - - //[加锁]防止消息重复消费 - $lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']); - if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) { - return Result::ACK; - } - $source = $data['data']['source']; $fds = $this->socketClientService->findUserFds($data['data']['sender']); @@ -144,7 +150,7 @@ class ChatMessageConsumer extends ConsumerMessage } else if ($source == 2) {//群聊 $userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive'])); foreach ($userIds as $uid) { - $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid))); + $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); } } @@ -241,7 +247,7 @@ class ChatMessageConsumer extends ConsumerMessage unset($result, $file, $code_block, $forward, $invite); - $this->socketPushNotify($fds, json_encode(['event_talk', $notify])); + $this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_TALK, $notify])); return Result::ACK; } @@ -257,7 +263,7 @@ class ChatMessageConsumer extends ConsumerMessage { $fds = $this->socketClientService->findUserFds($data['data']['receive_user']); - $this->socketPushNotify($fds, json_encode(['event_keyboard', $data['data']])); + $this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_KEYBOARD, $data['data']])); return Result::ACK; } @@ -278,7 +284,7 @@ class ChatMessageConsumer extends ConsumerMessage $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$friend_id)); } - $this->socketPushNotify(array_unique($fds), json_encode(['event_online_status', $data['data']])); + $this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_ONLINE_STATUS, $data['data']])); return Result::ACK; } @@ -308,7 +314,7 @@ class ChatMessageConsumer extends ConsumerMessage $this->socketPushNotify( array_unique($fds), - json_encode(['event_revoke_talk', [ + json_encode([SocketConstants::EVENT_REVOKE_TALK, [ 'record_id' => $record->id, 'source' => $record->source, 'user_id' => $record->user_id, @@ -330,7 +336,7 @@ class ChatMessageConsumer extends ConsumerMessage { $fds = $this->socketClientService->findUserFds($data['data']['receive']); - $this->socketPushNotify(array_unique($fds), json_encode(['event_friend_apply', $data['data']])); + $this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_FRIEND_APPLY, $data['data']])); return Result::ACK; } diff --git a/app/Amqp/Producer/ChatMessageProducer.php b/app/Amqp/Producer/ChatMessageProducer.php index 4e36a23..26fe655 100644 --- a/app/Amqp/Producer/ChatMessageProducer.php +++ b/app/Amqp/Producer/ChatMessageProducer.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Amqp\Producer; +use App\Constants\SocketConstants; use Hyperf\Amqp\Message\ProducerMessage; use Hyperf\Amqp\Message\Type; use Hyperf\Utils\Str; @@ -15,14 +16,22 @@ use Hyperf\Utils\Str; */ class ChatMessageProducer extends ProducerMessage { - // 交换机类型 + /** + * 交换机类型 + * + * @var string + */ public $type = Type::FANOUT; - // 交换机名称 - public $exchange = 'im.message.fanout'; + /** + * 交换机名称 + * + * @var string + */ + public $exchange = SocketConstants::CONSUMER_MESSAGE_EXCHANGE; /** - * 初始化处理... + * 实例化处理 * * @param string $event 事件名 * @param array $data 数据 diff --git a/app/Command/RemoveWsCacheCommand.php b/app/Command/RemoveWsCacheCommand.php index 4dec33c..0f6ea92 100644 --- a/app/Command/RemoveWsCacheCommand.php +++ b/app/Command/RemoveWsCacheCommand.php @@ -36,7 +36,10 @@ class RemoveWsCacheCommand extends HyperfCommand { $socket = new SocketClientService(); $this->line('此过程可能耗时较长,请耐心等待!', 'info'); + + // 获取所有已停止运行的服务ID $arr = $socket->getServerRunIdAll(2); + foreach ($arr as $run_id => $value) { go(function () use ($socket, $run_id) { $socket->removeRedisCache($run_id); diff --git a/app/Constants/SocketConstants.php b/app/Constants/SocketConstants.php new file mode 100644 index 0000000..02f5a99 --- /dev/null +++ b/app/Constants/SocketConstants.php @@ -0,0 +1,44 @@ +validate($this->request->all(), [ + $params1 = $this->request->inputs(['keyword', 'find_type', 'cid', 'page']); + $this->validate($params1, [ + // 搜索关键词 'keyword' => "present", + // 查询类型 $findType 1:获取近期日记 2:获取星标日记 3:获取指定分类文章 4:获取指定标签文章 5:获取已删除文章 6:关键词搜索 'find_type' => 'required|in:0,1,2,3,4,5,6', + // 分类ID 'cid' => 'present|integer|min:-1', 'page' => 'present|integer|min:1' ]); - // 查询类型 $findType 1:获取近期日记 2:获取星标日记 3:获取指定分类文章 4:获取指定标签文章 5:获取已删除文章 6:关键词搜索 - $findType = $this->request->input('find_type', 0); - $keyword = $this->request->input('keyword', '');// 搜索关键词 - $cid = $this->request->input('cid', -1);// 分类ID - $page = $this->request->input('page', 1); - $user_id = $this->uid(); - $params = []; - $params['find_type'] = $findType; - if (in_array($findType, [3, 4])) { - $params['class_id'] = $cid; + $params['find_type'] = $params1['find_type']; + if (in_array($params1['find_type'], [3, 4])) { + $params['class_id'] = $params1['cid']; } - if (!empty($keyword)) { - $params['keyword'] = $keyword; + if (!empty($params1['keyword'])) { + $params['keyword'] = $params1['keyword']; } return $this->response->success( - $this->articleService->getUserArticleList($user_id, intval($page), 10000, $params) + $this->articleService->getUserArticleList($this->uid(), 1, 10000, $params) ); } @@ -102,15 +99,13 @@ class ArticleController extends CController */ public function getArticleDetail() { - $this->validate($this->request->inputs(['article_id']), [ + $params = $this->request->inputs(['article_id']); + $this->validate($params, [ 'article_id' => 'required|integer' ]); return $this->response->success( - $this->articleService->getArticleDetail( - (int)$this->request->input('article_id'), - $this->uid() - ) + $this->articleService->getArticleDetail((int)$params['article_id'], $this->uid()) ); } @@ -170,11 +165,11 @@ class ArticleController extends CController $lockKey = "article_class_sort:{$params['class_id']}_{$params['sort_type']}"; // 获取Redis锁 - if (RedisLock::lock($lockKey, 0, 3)) { + if (RedisLock::lock($lockKey, 1, 3)) { $isTrue = $this->articleService->articleClassSort($this->uid(), (int)$params['class_id'], (int)$params['sort_type']); // 释放Redis锁 - RedisLock::release($lockKey, 0); + RedisLock::release($lockKey, 1); } else { $isTrue = false; } @@ -217,11 +212,7 @@ class ArticleController extends CController 'tag_name' => 'required' ]); - $id = $this->articleService->editArticleTag( - $this->uid(), - (int)$this->request->post('tag_id', 0), - $this->request->post('tag_name', '') - ); + $id = $this->articleService->editArticleTag($this->uid(), (int)$params['tag_id'], $params['tag_name']); return $id ? $this->response->success(['id' => $id]) diff --git a/app/Controller/Api/V1/AuthController.php b/app/Controller/Api/V1/AuthController.php index 962a824..7b4afcf 100644 --- a/app/Controller/Api/V1/AuthController.php +++ b/app/Controller/Api/V1/AuthController.php @@ -101,9 +101,9 @@ class AuthController extends CController { $params = $this->request->all(); $this->validate($params, [ - 'nickname' => "required", + 'nickname' => "required|max:20", 'mobile' => "required|regex:/^1[345789][0-9]{9}$/", - 'password' => 'required', + 'password' => 'required|max:16', 'sms_code' => 'required|digits:6', 'platform' => 'required|in:h5,ios,windows,mac,web', ]); @@ -135,10 +135,10 @@ class AuthController extends CController */ public function forget() { - $params = $this->request->all(); + $params = $this->request->inputs(['mobile', 'password', 'sms_code']); $this->validate($params, [ 'mobile' => "required|regex:/^1[345789][0-9]{9}$/", - 'password' => 'required', + 'password' => 'required|max:16', 'sms_code' => 'required|digits:6', ]); @@ -182,7 +182,7 @@ class AuthController extends CController */ public function sendVerifyCode() { - $params = $this->request->all(); + $params = $this->request->inputs(['type', 'mobile']); $this->validate($params, [ 'type' => "required", 'mobile' => "required|regex:/^1[345789][0-9]{9}$/" diff --git a/app/Controller/WebSocketController.php b/app/Controller/WebSocketController.php index ee33d4d..248e358 100644 --- a/app/Controller/WebSocketController.php +++ b/app/Controller/WebSocketController.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Controller; use Hyperf\Di\Annotation\Inject; +use App\Constants\SocketConstants; use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnMessageInterface; use Hyperf\Contract\OnOpenInterface; @@ -57,8 +58,8 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos // 消息事件绑定 const EVENTS = [ - 'event_talk' => 'onTalk', - 'event_keyboard' => 'onKeyboard', + SocketConstants::EVENT_TALK => 'onTalk', + SocketConstants::EVENT_KEYBOARD => 'onKeyboard', ]; /** @@ -88,7 +89,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos if (!$isOnline) { // 推送消息至队列 $this->producer->produce( - new ChatMessageProducer('event_online_status', [ + new ChatMessageProducer(SocketConstants::EVENT_ONLINE_STATUS, [ 'user_id' => $userInfo['user_id'], 'status' => 1, 'notify' => '好友上线通知...' @@ -137,7 +138,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos if (!$isOnline) { // 推送消息至队列 $this->producer->produce( - new ChatMessageProducer('event_online_status', [ + new ChatMessageProducer(SocketConstants::EVENT_ONLINE_STATUS, [ 'user_id' => $user_id, 'status' => 0, 'notify' => '好友离线通知通知...' diff --git a/app/Service/UserService.php b/app/Service/UserService.php index dff6c73..b886b1f 100644 --- a/app/Service/UserService.php +++ b/app/Service/UserService.php @@ -34,9 +34,7 @@ class UserService extends BaseService */ public function login(string $mobile, string $password) { - $user = User::where('mobile', $mobile)->first(); - - if (!$user) { + if (!$user = User::where('mobile', $mobile)->first()) { return false; } @@ -55,13 +53,13 @@ class UserService extends BaseService */ public function register(array $data) { + Db::beginTransaction(); try { $data['password'] = create_password($data['password']); $data['created_at'] = date('Y-m-d H:i:s'); $result = User::create($data); - // 创建用户的默认笔记分类 ArticleClass::create([ 'user_id' => $result->id, @@ -70,7 +68,10 @@ class UserService extends BaseService 'sort' => 1, 'created_at' => time() ]); + + Db::commit(); } catch (\Exception $e) { + Db::rollBack(); $result = false; } @@ -98,8 +99,7 @@ class UserService extends BaseService */ public function changeMobile(int $user_id, string $mobile) { - $uid = User::where('mobile', $mobile)->value('id'); - if ($uid) { + if (User::where('mobile', $mobile)->value('id')) { return [false, '手机号已被他人绑定']; }