diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index fe21dc4..0c6a395 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -13,6 +13,8 @@ namespace App\Amqp\Consumer; use App\Constants\TalkMsgType; use App\Constants\TalkType; +use App\Model\UsersFriendsApply; +use App\Service\UserService; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Result; use Hyperf\Amqp\Message\ConsumerMessage; @@ -20,7 +22,6 @@ use Hyperf\Amqp\Message\Type; use Hyperf\Amqp\Builder\QueueBuilder; use PhpAmqpLib\Message\AMQPMessage; use App\Model\User; -use App\Model\UsersFriend; use App\Model\Chat\TalkRecords; use App\Model\Chat\TalkRecordsCode; use App\Model\Chat\TalkRecordsFile; @@ -34,7 +35,7 @@ use App\Cache\SocketRoom; /** * 消息推送消费者队列 - * @Consumer(name="ConsumerChat",enable=true) + * @Consumer(name="ConsumerChat",enable=false) */ class ChatMessageConsumer extends ConsumerMessage { @@ -287,7 +288,9 @@ class ChatMessageConsumer extends ConsumerMessage $status = (int)$data['data']['status']; $fds = []; - foreach (UsersFriend::getFriendIds($user_id) as $friend_id) { + + $ids = container()->get(UserService::class)->getFriendIds($user_id); + foreach ($ids as $friend_id) { $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id))); } @@ -343,12 +346,38 @@ class ChatMessageConsumer extends ConsumerMessage */ public function onConsumeFriendApply(array $data, AMQPMessage $message): string { - $fds = $this->socketClientService->findUserFds($data['data']['receive']); + $data = $data['data']; - $this->socketPushNotify(array_unique($fds), json_encode([ - SocketConstants::EVENT_FRIEND_APPLY, - $data['data'] - ])); + $applyInfo = UsersFriendsApply::where('id', $data['apply_id'])->first(); + if (!$applyInfo) return Result::ACK; + + $fds = $this->socketClientService->findUserFds($data['type'] == 1 ? $applyInfo->friend_id : $applyInfo->user_id); + + if ($data['type'] == 1) { + $msg = [ + 'sender_id' => $applyInfo->user_id, + 'receiver_id' => $applyInfo->friend_id, + 'remark' => $applyInfo->remark, + ]; + } else { + $msg = [ + 'sender_id' => $applyInfo->friend_id, + 'receiver_id' => $applyInfo->user_id, + 'status' => $applyInfo->status, + 'remark' => $applyInfo->remark, + ]; + } + + $friendInfo = User::select(['id', 'avatar', 'nickname', 'mobile', 'motto'])->find($data['type'] == 1 ? $applyInfo->user_id : $applyInfo->friend_id); + + $msg['friend'] = [ + 'user_id' => $friendInfo->id, + 'avatar' => $friendInfo->avatar, + 'nickname' => $friendInfo->nickname, + 'mobile' => $friendInfo->mobile, + ]; + + $this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_FRIEND_APPLY, $msg])); return Result::ACK; } diff --git a/app/Amqp/Producer/ChatMessageProducer.php b/app/Amqp/Producer/ChatMessageProducer.php index 34a3f0e..792ad8a 100644 --- a/app/Amqp/Producer/ChatMessageProducer.php +++ b/app/Amqp/Producer/ChatMessageProducer.php @@ -54,4 +54,9 @@ class ChatMessageProducer extends ProducerMessage $this->payload = $message; } + + public function getPayload() + { + return $this->payload; + } } diff --git a/app/Controller/Api/V1/ContactsApplyController.php b/app/Controller/Api/V1/ContactsApplyController.php new file mode 100644 index 0000000..c362ae9 --- /dev/null +++ b/app/Controller/Api/V1/ContactsApplyController.php @@ -0,0 +1,171 @@ +request->inputs(['friend_id', 'remarks']); + $this->validate($params, [ + 'friend_id' => 'required|integer', + 'remarks' => 'present|max:50' + ]); + + $params['friend_id'] = (int)$params['friend_id']; + + $user = $userService->findById($params['friend_id']); + if (!$user) { + return $this->response->fail('用户不存在!'); + } + + $user_id = $this->uid(); + + [$isTrue, $result] = $this->service->create($user_id, $params['friend_id'], $params['remarks']); + if (!$isTrue) { + return $this->response->fail('添加好友申请失败!'); + } + + // 好友申请未读消息数自增 + FriendApply::getInstance()->incr($params['friend_id'], 1); + + // 判断对方是否在线。如果在线发送消息通知 + if ($this->socketClientService->isOnlineAll($params['friend_id'])) { + push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [ + 'apply_id' => $result->id, + 'type' => 1, + ])); + } + + return $this->response->success([], '发送好友申请成功...'); + } + + /** + * @RequestMapping(path="accept", methods="post") + * @return ResponseInterface + */ + public function accept() + { + $params = $this->request->inputs(['apply_id', 'remarks']); + $this->validate($params, [ + 'apply_id' => 'required|integer', + 'remarks' => 'present|max:20' + ]); + + $user_id = $this->uid(); + $isTrue = $this->service->accept($user_id, intval($params['apply_id']), $params['remarks']); + if (!$isTrue) { + return $this->response->fail('处理失败!'); + } + + $friend_id = UsersFriendsApply::where('id', $params['apply_id'])->where('friend_id', $user_id)->value('user_id'); + + // 判断对方是否在线。如果在线发送消息通知 + if ($this->socketClientService->isOnlineAll($friend_id)) { + push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [ + 'apply_id' => (int)$params['apply_id'], + 'type' => 2, + ])); + } + + return $this->response->success([], '处理成功...'); + } + + /** + * @RequestMapping(path="decline", methods="post") + * @return ResponseInterface + */ + public function decline() + { + $params = $this->request->inputs(['apply_id', 'remarks']); + $this->validate($params, [ + 'apply_id' => 'required|integer', + 'remarks' => 'present|max:20' + ]); + + $isTrue = $this->service->decline($this->uid(), intval($params['apply_id']), $params['remarks']); + return $isTrue + ? $this->response->success() + : $this->response->fail(); + } + + /** + * @RequestMapping(path="delete", methods="post") + * @return ResponseInterface + */ + public function delete() + { + $params = $this->request->inputs(['apply_id']); + $this->validate($params, [ + 'apply_id' => 'required|integer' + ]); + + return $this->service->delete($this->uid(), intval($params['apply_id'])) + ? $this->response->success() + : $this->response->fail(); + } + + /** + * 获取联系人申请未读数 + * @RequestMapping(path="records", methods="get") + * + * @return ResponseInterface + */ + public function records() + { + $params = $this->request->inputs(['page', 'page_size']); + $this->validate($params, [ + 'page' => 'present|integer', + 'page_size' => 'present|integer' + ]); + + $page = $this->request->input('page', 1); + $page_size = $this->request->input('page_size', 10); + $user_id = $this->uid(); + + $data = $this->service->getApplyRecords($user_id, $page, $page_size); + + FriendApply::getInstance()->rem(strval($user_id)); + + return $this->response->success($data); + } +} diff --git a/app/Controller/Api/V1/ContactsController.php b/app/Controller/Api/V1/ContactsController.php index f5a3d40..a949e37 100644 --- a/app/Controller/Api/V1/ContactsController.php +++ b/app/Controller/Api/V1/ContactsController.php @@ -11,18 +11,15 @@ namespace App\Controller\Api\V1; use App\Model\UsersFriend; +use App\Service\UserService; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Annotation\Middleware; use App\Middleware\JWTAuthMiddleware; use Psr\Http\Message\ResponseInterface; -use App\Amqp\Producer\ChatMessageProducer; use App\Service\ContactsService; use App\Service\SocketClientService; -use App\Service\UserService; -use App\Constants\SocketConstants; -use App\Model\UsersFriendsApply; use App\Model\TalkList; use App\Cache\FriendApply; use App\Cache\FriendRemark; @@ -41,13 +38,7 @@ class ContactsController extends CController * @Inject * @var ContactsService */ - private $contactsService; - - /** - * @inject - * @var SocketClientService - */ - private $socketClientService; + private $service; /** * 获取用户联系人列表 @@ -55,61 +46,19 @@ class ContactsController extends CController * * @return ResponseInterface */ - public function getContacts() + public function getContacts(UserService $service) { - $rows = UsersFriend::getUserFriends($this->uid()); + $rows = $service->getUserFriends($this->uid()); if ($rows) { $runArr = ServerRunID::getInstance()->getServerRunIdAll(); foreach ($rows as $k => $row) { - $rows[$k]['is_online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr); + $rows[$k]['is_online'] = container()->get(SocketClientService::class)->isOnlineAll($row['id'], $runArr); } } return $this->response->success($rows); } - /** - * 添加联系人 - * @RequestMapping(path="add", methods="post") - * - * @param UserService $userService - * @return ResponseInterface - */ - public function addContact(UserService $userService) - { - $params = $this->request->inputs(['friend_id', 'remarks']); - $this->validate($params, [ - 'friend_id' => 'required|integer', - 'remarks' => 'present|max:50' - ]); - - $user = $userService->findById($params['friend_id']); - if (!$user) { - return $this->response->fail('用户不存在!'); - } - - $user_id = $this->uid(); - if (!$this->contactsService->addContact($user_id, intval($params['friend_id']), $params['remarks'])) { - return $this->response->fail('添加好友申请失败!'); - } - - // 好友申请未读消息数自增 - FriendApply::getInstance()->incr($params['friend_id'], 1); - - // 判断对方是否在线。如果在线发送消息通知 - if ($this->socketClientService->isOnlineAll(intval($params['friend_id']))) { - push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [ - 'sender' => $user_id, - 'receive' => intval($params['friend_id']), - 'type' => 1, - 'status' => 1, - 'remark' => '' - ])); - } - - return $this->response->success([], '发送好友申请成功...'); - } - /** * 删除联系人 * @RequestMapping(path="delete", methods="post") @@ -124,7 +73,7 @@ class ContactsController extends CController ]); $user_id = $this->uid(); - if (!$this->contactsService->deleteContact($user_id, intval($params['friend_id']))) { + if (!$this->service->delete($user_id, intval($params['friend_id']))) { return $this->response->fail('好友关系解除失败!'); } @@ -132,91 +81,11 @@ class ContactsController extends CController TalkList::delItem($user_id, $params['friend_id'], 2); TalkList::delItem($params['friend_id'], $user_id, 2); - // ... TODO 推送消息(待完善) + // TODO 推送消息(待完善) return $this->response->success([], '好友关系解除成功...'); } - /** - * 同意添加联系人 - * @RequestMapping(path="accept-invitation", methods="post") - * - * @return ResponseInterface - */ - public function acceptInvitation() - { - $params = $this->request->inputs(['apply_id', 'remarks']); - $this->validate($params, [ - 'apply_id' => 'required|integer', - 'remarks' => 'present|max:20' - ]); - - $user_id = $this->uid(); - $isTrue = $this->contactsService->acceptInvitation($user_id, intval($params['apply_id']), $params['remarks']); - if (!$isTrue) { - return $this->response->fail('处理失败!'); - } - - $friend_id = $info = UsersFriendsApply::where('id', $params['apply_id']) - ->where('friend_id', $user_id) - ->value('user_id'); - - // 判断对方是否在线。如果在线发送消息通知 - if ($this->socketClientService->isOnlineAll($friend_id)) { - // TODO 待完善 - push_amqp(new ChatMessageProducer(SocketConstants::EVENT_FRIEND_APPLY, [ - 'sender' => $user_id, - 'receive' => $friend_id, - 'type' => 1, - 'status' => 1, - 'remark' => '' - ])); - } - - return $this->response->success([], '处理成功...'); - } - - /** - * 拒绝添加联系人(预留) - * @RequestMapping(path="decline-invitation", methods="post") - * - * @return ResponseInterface - */ - public function declineInvitation() - { - $params = $this->request->inputs(['apply_id', 'remarks']); - $this->validate($params, [ - 'apply_id' => 'required|integer', - 'remarks' => 'present|max:20' - ]); - - $isTrue = $this->contactsService->declineInvitation($this->uid(), intval($params['apply_id']), $params['remarks']); - - return $isTrue - ? $this->response->success() - : $this->response->fail(); - } - - /** - * 删除联系人申请记录 - * @RequestMapping(path="delete-apply", methods="post") - * - * @return ResponseInterface - */ - public function deleteContactApply() - { - $params = $this->request->inputs(['apply_id']); - $this->validate($params, [ - 'apply_id' => 'required|integer' - ]); - - $isTrue = $this->contactsService->delContactApplyRecord($this->uid(), intval($params['apply_id'])); - - return $isTrue - ? $this->response->success() - : $this->response->fail(); - } - /** * 获取联系人申请未读数 * @RequestMapping(path="apply-unread-num", methods="get") @@ -230,31 +99,6 @@ class ContactsController extends CController ]); } - /** - * 获取联系人申请未读数 - * @RequestMapping(path="apply-records", methods="get") - * - * @return ResponseInterface - */ - public function getContactApplyRecords() - { - $params = $this->request->inputs(['page', 'page_size']); - $this->validate($params, [ - 'page' => 'present|integer', - 'page_size' => 'present|integer' - ]); - - $page = $this->request->input('page', 1); - $page_size = $this->request->input('page_size', 10); - $user_id = $this->uid(); - - $data = $this->contactsService->getContactApplyRecords($user_id, $page, $page_size); - - FriendApply::getInstance()->rem(strval($user_id)); - - return $this->response->success($data); - } - /** * 搜索联系人 * @RequestMapping(path="search", methods="get") @@ -268,7 +112,7 @@ class ContactsController extends CController 'mobile' => "present|regex:/^1[3456789][0-9]{9}$/" ]); - $result = $this->contactsService->findContact($params['mobile']); + $result = $this->service->findContact($params['mobile']); return $this->response->success($result); } @@ -287,7 +131,7 @@ class ContactsController extends CController ]); $user_id = $this->uid(); - $isTrue = $this->contactsService->editContactRemark($user_id, intval($params['friend_id']), $params['remarks']); + $isTrue = $this->service->editRemark($user_id, intval($params['friend_id']), $params['remarks']); if (!$isTrue) { return $this->response->fail('备注修改失败!'); } diff --git a/app/Controller/Api/V1/GroupController.php b/app/Controller/Api/V1/GroupController.php index 83f52b8..815c807 100644 --- a/app/Controller/Api/V1/GroupController.php +++ b/app/Controller/Api/V1/GroupController.php @@ -12,6 +12,7 @@ namespace App\Controller\Api\V1; use App\Cache\SocketRoom; use App\Constants\TalkType; +use App\Service\UserService; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\RequestMapping; @@ -329,10 +330,10 @@ class GroupController extends CController * * @return ResponseInterface */ - public function getInviteFriends() + public function getInviteFriends(UserService $service) { $group_id = $this->request->input('group_id', 0); - $friends = UsersFriend::getUserFriends($this->uid()); + $friends = $service->getUserFriends($this->uid()); if ($group_id > 0 && $friends) { if ($ids = GroupMember::getGroupMemberIds($group_id)) { foreach ($friends as $k => $item) { diff --git a/app/Model/UsersFriend.php b/app/Model/UsersFriend.php index f155612..8921fe9 100644 --- a/app/Model/UsersFriend.php +++ b/app/Model/UsersFriend.php @@ -4,6 +4,8 @@ declare (strict_types=1); namespace App\Model; +use App\Cache\FriendRemark; + /** * 表情包收藏数据表模型 * @@ -24,6 +26,7 @@ class UsersFriend extends BaseModel 'user_id', 'friend_id', 'status', + 'remark', 'created_at', 'updated_at', ]; @@ -37,35 +40,16 @@ class UsersFriend extends BaseModel 'updated_at' => 'datetime', ]; - /** - * 获取用户所有好友 - * - * @param int $user_id 用户ID - * @return array - */ - public static function getUserFriends(int $user_id) - { - return UsersFriend::leftJoin('users', 'users.id', '=', 'users_friends.friend_id') - ->where('user_id', $user_id)->where('users_friends.status', 1) - ->get([ - 'users.id', - 'users.nickname', - 'users.avatar', - 'users.motto', - 'users.gender', - 'users_friends.remark as friend_remark', - ])->toArray(); - } - /** * 判断用户之间是否存在好友关系 * * @param int $user_id 用户ID * @param int $friend_id 好友ID * @param bool $is_cache 是否允许读取缓存 + * @param bool $is_mutual 相互互为好友 * @return bool */ - public static function isFriend(int $user_id, int $friend_id, bool $is_cache = false) + public static function isFriend(int $user_id, int $friend_id, bool $is_cache = false, $is_mutual = false) { $cacheKey = "good_friends:{$user_id}_{$friend_id}"; if ($is_cache && redis()->get($cacheKey)) { @@ -81,13 +65,20 @@ class UsersFriend extends BaseModel } /** - * 获取指定用户的所有朋友的用户ID + * 获取好友备注 * - * @param int $user_id 指定用户ID - * @return array + * @param int $user_id 用户ID + * @param int $friend_id 好友ID + * @return string */ - public static function getFriendIds(int $user_id) + public static function getFriendRemark(int $user_id, int $friend_id) { - return UsersFriend::where('user_id', $user_id)->where('status', 1)->pluck('friend_id')->toArray(); + $remark = FriendRemark::getInstance()->read($user_id, $friend_id); + if ($remark) return $remark; + + $remark = UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->value('remark'); + if ($remark) FriendRemark::getInstance()->save($user_id, $friend_id, $remark); + + return (string)$remark; } } diff --git a/app/Model/UsersFriendsApply.php b/app/Model/UsersFriendsApply.php index 9534750..bd38394 100644 --- a/app/Model/UsersFriendsApply.php +++ b/app/Model/UsersFriendsApply.php @@ -11,7 +11,7 @@ namespace App\Model; * @property int $user_id 用户ID * @property int $friend_id 朋友ID * @property int $status 申请状态 - * @property string $remarks 备注说明 + * @property string $remark 备注说明 * @property string $created_at 创建时间 * @property string $updated_at 更新时间 * @package App\Model diff --git a/app/Process/RedisSubscribe.php b/app/Process/RedisSubscribe.php deleted file mode 100644 index 5bf5df2..0000000 --- a/app/Process/RedisSubscribe.php +++ /dev/null @@ -1,43 +0,0 @@ -subscribe($this->chans, [$this, 'subscribe']); - } - - /** - * 订阅处理逻辑 - * - * @param $redis - * @param string $chan - * @param string $msg - */ - public function subscribe($redis, string $chan, string $msg) - { - echo PHP_EOL . "chan : $chan , msg : $msg"; - } - - public function isEnable($server): bool - { - return false; - } -} diff --git a/app/Process/RedisWebsocketSubscribe.php b/app/Process/RedisWebsocketSubscribe.php new file mode 100644 index 0000000..2d51e5a --- /dev/null +++ b/app/Process/RedisWebsocketSubscribe.php @@ -0,0 +1,385 @@ + 'onConsumeTalk', + + // 键盘输入事件 + SocketConstants::EVENT_KEYBOARD => 'onConsumeKeyboard', + + // 用户在线状态事件 + SocketConstants::EVENT_ONLINE_STATUS => 'onConsumeOnlineStatus', + + // 聊天消息推送事件 + SocketConstants::EVENT_REVOKE_TALK => 'onConsumeRevokeTalk', + + // 好友申请相关事件 + SocketConstants::EVENT_FRIEND_APPLY => 'onConsumeFriendApply' + ]; + + /** + * @var SocketClientService + */ + private $socketClientService; + + public function handle(): void + { + $this->socketClientService = container()->get(SocketClientService::class); + + redis()->subscribe($this->chans, [$this, 'subscribe']); + } + + /** + * 订阅处理逻辑 + * + * @param $redis + * @param string $chan + * @param string $message + */ + public function subscribe($redis, string $chan, string $message) + { + //echo PHP_EOL . "chan : $chan , msg : $message"; + $data = json_decode($message, true); + + $this->{self::EVENTS[$data['event']]}($data); + } + + public function isEnable($server): bool + { + return true; + } + + /** + * 对话聊天消息 + * + * @param array $data 队列消息 + * @return string + */ + public function onConsumeTalk(array $data): string + { + $talk_type = $data['data']['talk_type']; + $sender_id = $data['data']['sender_id']; + $receiver_id = $data['data']['receiver_id']; + $record_id = $data['data']['record_id']; + + $fds = []; + $groupInfo = null; + + if ($talk_type == TalkType::PRIVATE_CHAT) { + $fds = array_merge( + $this->socketClientService->findUserFds($sender_id), + $this->socketClientService->findUserFds($receiver_id) + ); + } else if ($talk_type == TalkType::GROUP_CHAT) { + foreach (SocketRoom::getInstance()->getRoomMembers(strval($receiver_id)) as $uid) { + $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid))); + } + + $groupInfo = Group::where('id', $receiver_id)->first(['group_name', 'avatar']); + } + + // 客户端ID去重 + if (!$fds = array_unique($fds)) { + return Result::ACK; + } + + $result = TalkRecords::leftJoin('users', 'users.id', '=', 'talk_records.user_id') + ->where('talk_records.id', $record_id) + ->first([ + 'talk_records.id', + 'talk_records.talk_type', + 'talk_records.msg_type', + 'talk_records.user_id', + 'talk_records.receiver_id', + 'talk_records.content', + 'talk_records.is_revoke', + 'talk_records.created_at', + 'users.nickname', + 'users.avatar', + ]); + + if (!$result) return Result::ACK; + + $file = $code_block = $forward = $invite = []; + + switch ($result->msg_type) { + case TalkMsgType::FILE_MESSAGE: + $file = TalkRecordsFile::where('record_id', $result->id)->first([ + 'id', 'record_id', 'user_id', 'file_source', 'file_type', + 'save_type', 'original_name', 'file_suffix', 'file_size', 'save_dir' + ]); + + $file = $file ? $file->toArray() : []; + $file && $file['file_url'] = get_media_url($file['save_dir']); + break; + + case TalkMsgType::FORWARD_MESSAGE: + $forward = ['num' => 0, 'list' => []]; + $forwardInfo = TalkRecordsForward::where('record_id', $result->id)->first(['records_id', 'text']); + if ($forwardInfo) { + $forward = [ + 'num' => count(parse_ids($forwardInfo->records_id)), + 'list' => json_decode($forwardInfo->text, true) ?? [] + ]; + } + break; + + case TalkMsgType::CODE_MESSAGE: + $code_block = TalkRecordsCode::where('record_id', $result->id)->first(['record_id', 'code_lang', 'code']); + $code_block = $code_block ? $code_block->toArray() : []; + break; + + case TalkMsgType::GROUP_INVITE_MESSAGE: + $notifyInfo = TalkRecordsInvite::where('record_id', $result->id)->first([ + 'record_id', 'type', 'operate_user_id', 'user_ids' + ]); + + $userInfo = User::where('id', $notifyInfo->operate_user_id)->first(['nickname', 'id']); + $invite = [ + 'type' => $notifyInfo->type, + 'operate_user' => ['id' => $userInfo->id, 'nickname' => $userInfo->nickname], + 'users' => User::whereIn('id', parse_ids($notifyInfo->user_ids))->get(['id', 'nickname'])->toArray() + ]; + + unset($notifyInfo, $userInfo); + break; + } + + $notify = [ + 'sender_id' => $sender_id, + 'receiver_id' => $receiver_id, + 'talk_type' => $talk_type, + 'data' => $this->formatTalkMessage([ + 'id' => $result->id, + 'talk_type' => $result->talk_type, + 'msg_type' => $result->msg_type, + "user_id" => $result->user_id, + "receiver_id" => $result->receiver_id, + 'avatar' => $result->avatar, + 'nickname' => $result->nickname, + 'group_name' => $groupInfo ? $groupInfo->group_name : '', + 'group_avatar' => $groupInfo ? $groupInfo->avatar : '', + "created_at" => $result->created_at, + "content" => $result->content, + "file" => $file, + "code_block" => $code_block, + 'forward' => $forward, + 'invite' => $invite + ]) + ]; + + $this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_TALK, $notify])); + + return Result::ACK; + } + + /** + * 键盘输入事件消息 + * + * @param array $data 队列消息 + * @return string + */ + public function onConsumeKeyboard(array $data): string + { + $fds = $this->socketClientService->findUserFds($data['data']['receiver_id']); + + $this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_KEYBOARD, $data['data']])); + + return Result::ACK; + } + + /** + * 用户上线或下线消息 + * + * @param array $data 队列消息 + * @return string + */ + public function onConsumeOnlineStatus(array $data): string + { + $user_id = (int)$data['data']['user_id']; + $status = (int)$data['data']['status']; + + $fds = []; + + $ids = container()->get(UserService::class)->getFriendIds($user_id); + foreach ($ids as $friend_id) { + $fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id))); + } + + $this->socketPushNotify(array_unique($fds), json_encode([ + SocketConstants::EVENT_ONLINE_STATUS, [ + 'user_id' => $user_id, + 'status' => $status + ] + ])); + + return Result::ACK; + } + + /** + * 撤销聊天消息 + * + * @param array $data 队列消息 + * @return string + */ + public function onConsumeRevokeTalk(array $data): string + { + $record = TalkRecords::where('id', $data['data']['record_id'])->first(['id', 'talk_type', 'user_id', 'receiver_id']); + + $fds = []; + if ($record->talk_type == TalkType::PRIVATE_CHAT) { + $fds = array_merge($fds, $this->socketClientService->findUserFds($record->user_id)); + $fds = array_merge($fds, $this->socketClientService->findUserFds($record->receiver_id)); + } else if ($record->talk_type == TalkType::GROUP_CHAT) { + $userIds = SocketRoom::getInstance()->getRoomMembers(strval($record->receiver_id)); + foreach ($userIds as $uid) { + $fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid)); + } + } + + $fds = array_unique($fds); + $this->socketPushNotify($fds, json_encode([SocketConstants::EVENT_REVOKE_TALK, [ + 'talk_type' => $record->talk_type, + 'sender_id' => $record->user_id, + 'receiver_id' => $record->receiver_id, + 'record_id' => $record->id, + ]])); + + return Result::ACK; + } + + /** + * 好友申请消息 + * + * @param array $data 队列消息 + * @return string + */ + public function onConsumeFriendApply(array $data): string + { + $data = $data['data']; + + $applyInfo = UsersFriendsApply::where('id', $data['apply_id'])->first(); + if (!$applyInfo) return Result::ACK; + + $fds = $this->socketClientService->findUserFds($data['type'] == 1 ? $applyInfo->friend_id : $applyInfo->user_id); + + if ($data['type'] == 1) { + $msg = [ + 'sender_id' => $applyInfo->user_id, + 'receiver_id' => $applyInfo->friend_id, + 'remark' => $applyInfo->remark, + ]; + } else { + $msg = [ + 'sender_id' => $applyInfo->friend_id, + 'receiver_id' => $applyInfo->user_id, + 'status' => $applyInfo->status, + 'remark' => $applyInfo->remark, + ]; + } + + $friendInfo = User::select(['id', 'avatar', 'nickname', 'mobile', 'motto'])->find($data['type'] == 1 ? $applyInfo->user_id : $applyInfo->friend_id); + + $msg['friend'] = [ + 'user_id' => $friendInfo->id, + 'avatar' => $friendInfo->avatar, + 'nickname' => $friendInfo->nickname, + 'mobile' => $friendInfo->mobile, + ]; + + $this->socketPushNotify(array_unique($fds), json_encode([SocketConstants::EVENT_FRIEND_APPLY, $msg])); + + return Result::ACK; + } + + /** + * WebSocket 消息推送 + * + * @param $fds + * @param $message + */ + private function socketPushNotify($fds, $message) + { + $server = server(); + foreach ($fds as $fd) { + $server->exist(intval($fd)) && $server->push(intval($fd), $message); + } + } + + /** + * 格式化对话的消息体 + * + * @param array $data 对话的消息 + * @return array + */ + private function formatTalkMessage(array $data): array + { + $message = [ + "id" => 0, // 消息记录ID + "talk_type" => 1, // 消息来源[1:好友私信;2:群聊] + "msg_type" => 1, // 消息类型 + "user_id" => 0, // 发送者用户ID + "receiver_id" => 0, // 接收者ID[好友ID或群ID] + + // 发送消息人的信息 + "nickname" => "",// 用户昵称 + "avatar" => "",// 用户头像 + "group_name" => "",// 群组名称 + "group_avatar" => "",// 群组头像 + + // 不同的消息类型 + "file" => [], + "code_block" => [], + "forward" => [], + "invite" => [], + + // 消息创建时间 + "content" => '',// 文本消息 + "created_at" => "", + + // 消息属性 + "is_revoke" => 0, // 消息是否撤销 + ]; + + return array_merge($message, array_intersect_key($data, $message)); + } +} diff --git a/app/Service/ContactApplyService.php b/app/Service/ContactApplyService.php index 4dff4a3..51b28cd 100644 --- a/app/Service/ContactApplyService.php +++ b/app/Service/ContactApplyService.php @@ -2,54 +2,144 @@ namespace App\Service; +use App\Model\User; +use App\Model\UsersFriend; use App\Model\UsersFriendsApply; +use App\Traits\PagingTrait; +use Hyperf\DbConnection\Db; class ContactApplyService { + use PagingTrait; + /** * 创建好友申请 * - * @param int $user_id 申请人用户ID + * @param int $user_id 用户ID * @param int $friend_id 朋友ID * @param string $remark 申请备注 - * @return bool + * @return array */ public function create(int $user_id, int $friend_id, string $remark) { - // 查询最后一次联系人申请 - $result = UsersFriendsApply::where('user_id', $user_id)->where('friend_id', $friend_id)->orderBy('id', 'desc')->first(); - if ($result && $result->status == 0) { - $result->remarks = $remark; - $result->updated_at = date('Y-m-d H:i:s'); - $result->save(); - return true; - } - $result = UsersFriendsApply::create([ 'user_id' => $user_id, 'friend_id' => $friend_id, 'status' => 0, - 'remarks' => $remark, + 'remark' => $remark, 'created_at' => date('Y-m-d H:i:s'), - 'updated_at' => date('Y-m-d H:i:s') + 'updated_at' => date('Y-m-d H:i:s'), ]); - return (bool)$result; + if (!$result) { + return [false, $result]; + } + + return [true, $result]; } /** * 同意好友申请 + * + * @param int $user_id 用户ID + * @param int $apply_id 申请记录ID */ - public function accept() + public function accept(int $user_id, int $apply_id, string $remarks = '') { + $info = UsersFriendsApply::where('id', $apply_id)->first(); + if (!$info || $info->friend_id != $user_id) { + return false; + } + Db::beginTransaction(); + try { + $info->status = 1; + $info->save(); + + UsersFriend::updateOrCreate([ + 'user_id' => $info->user_id, + 'friend_id' => $info->friend_id, + ], [ + 'status' => 1, + 'remark' => $remarks, + ]); + + UsersFriend::updateOrCreate([ + 'user_id' => $info->friend_id, + 'friend_id' => $info->user_id, + ], [ + 'status' => 1, + 'remark' => User::where('id', $info->user_id)->value('nickname'), + ]); + + Db::commit(); + } catch (\Exception $e) { + Db::rollBack(); + echo $e->getMessage(); + return false; + } + + return true; } /** * 拒绝好友申请 + * + * @param int $user_id 用户ID + * @param int $apply_id 申请记录ID */ - public function decline() + public function decline(int $user_id, int $apply_id, string $reason = '') { + return (bool)UsersFriendsApply::where('id', $apply_id)->where('friend_id', $user_id)->update([ + 'status' => 2, + 'reason' => $reason, + 'updated_at' => date('Y-m-d H:i:s'), + ]); + } + /** + * 删除好友申请 + * + * @param int $user_id + * @param int $apply_id + * @return bool + */ + public function delete(int $user_id, int $apply_id) + { + return (bool)UsersFriendsApply::where('id', $apply_id)->where('friend_id', $user_id)->delete(); + } + + /** + * 获取联系人申请记录 + * + * @param int $user_id 用户ID + * @param int $page 当前分页 + * @param int $page_size 分页大小 + * @return array + */ + public function getApplyRecords(int $user_id, $page = 1, $page_size = 30): array + { + $rowsSqlObj = UsersFriendsApply::select([ + 'users_friends_apply.id', + 'users_friends_apply.status', + 'users_friends_apply.remark', + 'users.nickname', + 'users.avatar', + 'users.mobile', + 'users_friends_apply.user_id', + 'users_friends_apply.friend_id', + 'users_friends_apply.created_at' + ]); + + $rowsSqlObj->leftJoin('users', 'users.id', '=', 'users_friends_apply.user_id'); + $rowsSqlObj->where('users_friends_apply.friend_id', $user_id); + + $count = $rowsSqlObj->count(); + $rows = []; + if ($count > 0) { + $rows = $rowsSqlObj->orderBy('users_friends_apply.id', 'desc')->forPage($page, $page_size)->get()->toArray(); + } + + return $this->getPagingRows($rows, $count, $page, $page_size); } } diff --git a/app/Service/ContactsService.php b/app/Service/ContactsService.php index e51073c..8294868 100644 --- a/app/Service/ContactsService.php +++ b/app/Service/ContactsService.php @@ -12,10 +12,7 @@ namespace App\Service; use App\Model\User; use App\Model\UsersFriend; -use App\Model\UsersFriendsApply; use App\Traits\PagingTrait; -use Hyperf\DbConnection\Db; -use Exception; /** * ContactsService @@ -27,43 +24,6 @@ class ContactsService extends BaseService { use PagingTrait; - /** - * 添加联系人/申请 - * - * @param int $user_id 用户ID - * @param int $friend_id 好友ID - * @param string $remarks 申请备注 - * @return bool - */ - public function addContact(int $user_id, int $friend_id, string $remarks): bool - { - // 判断是否是好友关系 - if (UsersFriend::isFriend($user_id, $friend_id)) return true; - - // 查询最后一次联系人申请 - $result = UsersFriendsApply::where('user_id', $user_id) - ->where('friend_id', $friend_id) - ->orderBy('id', 'desc')->first(); - - if ($result && $result->status == 0) { - $result->remarks = $remarks; - $result->updated_at = date('Y-m-d H:i:s'); - $result->save(); - return true; - } else { - $result = UsersFriendsApply::create([ - 'user_id' => $user_id, - 'friend_id' => $friend_id, - 'status' => 0, - 'remarks' => $remarks, - 'created_at' => date('Y-m-d H:i:s'), - 'updated_at' => date('Y-m-d H:i:s') - ]); - - return (bool)$result; - } - } - /** * 删除联系人 * @@ -71,102 +31,16 @@ class ContactsService extends BaseService * @param int $friend_id 好友ID * @return bool */ - public function deleteContact(int $user_id, int $friend_id): bool + public function delete(int $user_id, int $friend_id): bool { - if (!UsersFriend::isFriend($user_id, $friend_id)) { - return false; - } - - $data = ['status' => 0]; - - // 用户ID比大小交换位置 - if ($user_id > $friend_id) { - [$user_id, $friend_id] = [$friend_id, $user_id]; - } - - return (bool)UsersFriend::where('user1', $user_id)->where('user2', $friend_id)->update($data); - } - - /** - * 同意添加联系人 / 联系人申请 - * - * @param int $user_id 用户ID - * @param int $apply_id 联系人申请ID - * @param string $remarks 联系人备注名称 - * @return bool - */ - public function acceptInvitation(int $user_id, int $apply_id, string $remarks = ''): bool - { - $info = UsersFriendsApply::where('id', $apply_id) - ->where('friend_id', $user_id) - ->where('status', 0) - ->orderBy('id', 'desc') - ->first(['user_id', 'friend_id']); - - if (!$info) return false; - - Db::beginTransaction(); - try { - $res = UsersFriendsApply::where('id', $apply_id)->update(['status' => 1, 'updated_at' => date('Y-m-d H:i:s')]); - if (!$res) { - throw new Exception('更新好友申请表信息失败'); - } - - // 判断大小 交换 user1,user2 的位置 - [$user1, $user2] = $info->user_id < $info->friend_id ? [$info->user_id, $info->friend_id] : [$info->friend_id, $info->user_id]; - - // 查询是否存在好友记录 - $friendResult = UsersFriend::where([ - ['user1', '=', $user1], - ['user2', '=', $user2] - ])->first(['id', 'user1', 'user2', 'active', 'status']); - - if ($friendResult) { - $active = ($friendResult->user1 == $info->user_id && $friendResult->user2 == $info->friend_id) ? 1 : 2; - UsersFriend::where('id', $friendResult->id)->update(['active' => $active, 'status' => 1]); - } else { - $friend_nickname = User::where('id', $info->friend_id)->value('nickname'); - - UsersFriend::create([ - 'user1' => $user1, - 'user2' => $user2, - 'user1_remark' => $user1 == $user_id ? $remarks : $friend_nickname, - 'user2_remark' => $user2 == $user_id ? $remarks : $friend_nickname, - 'active' => $user1 == $user_id ? 2 : 1, - 'status' => 1, - 'agree_time' => date('Y-m-d H:i:s'), - 'created_at' => date('Y-m-d H:i:s') - ]); - } - - Db::commit(); - } catch (Exception $e) { - Db::rollBack(); - return false; - } - - return true; - } - - /** - * 拒绝添加联系人 / 联系人申请 - * - * @param int $user_id 用户ID - * @param int $apply_id 联系人申请ID - * @param string $remarks 拒绝申请备注信息 - * @return bool - */ - public function declineInvitation(int $user_id, int $apply_id, string $remarks = ''): bool - { - return (bool)UsersFriendsApply::where([ - ['id', '=', $apply_id], - ['user_id', '=', $user_id], - ['status', '=', 2], - ])->update([ - 'status' => 2, - 'remarks' => $remarks, + $res = (bool)UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->where('status', 1)->update([ + 'status' => 0, 'updated_at' => date('Y-m-d H:i:s') ]); + + if ($res) redis()->del("good_friends:{$user_id}_{$friend_id}"); + + return $res; } /** @@ -177,9 +51,12 @@ class ContactsService extends BaseService * @param string $remark 好友备注名称 * @return bool */ - public function editContactRemark(int $user_id, int $friend_id, string $remark): bool + public function editRemark(int $user_id, int $friend_id, string $remark): bool { - return (bool)UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->update(['remark' => $remark]); + return (bool)UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->update([ + 'remark' => $remark, + 'updated_at' => date('Y-m-d H:i:s') + ]); } /** @@ -194,51 +71,4 @@ class ContactsService extends BaseService return $user ? $user->toArray() : []; } - - /** - * 获取联系人申请记录 - * - * @param int $user_id 用户ID - * @param int $page 当前分页 - * @param int $page_size 分页大小 - * @return array - */ - public function getContactApplyRecords(int $user_id, $page = 1, $page_size = 30): array - { - $rowsSqlObj = UsersFriendsApply::select([ - 'users_friends_apply.id', - 'users_friends_apply.status', - 'users_friends_apply.remarks', - 'users.nickname', - 'users.avatar', - 'users.mobile', - 'users_friends_apply.user_id', - 'users_friends_apply.friend_id', - 'users_friends_apply.created_at' - ]); - - $rowsSqlObj->leftJoin('users', 'users.id', '=', 'users_friends_apply.user_id'); - $rowsSqlObj->where('users_friends_apply.friend_id', $user_id); - - $count = $rowsSqlObj->count(); - $rows = []; - if ($count > 0) { - $rows = $rowsSqlObj->orderBy('users_friends_apply.id', 'desc')->forPage($page, $page_size)->get()->toArray(); - } - - return $this->getPagingRows($rows, $count, $page, $page_size); - } - - /** - * 删除联系人申请记录 - * - * @param int $user_id 用户ID - * @param int $apply_id 联系人好友申请ID - * @return bool - * @throws Exception - */ - public function delContactApplyRecord(int $user_id, int $apply_id): bool - { - return (bool)UsersFriendsApply::where('id', $apply_id)->where('friend_id', $user_id)->delete(); - } } diff --git a/app/Service/TalkService.php b/app/Service/TalkService.php index 0b7a4d3..1778ec2 100644 --- a/app/Service/TalkService.php +++ b/app/Service/TalkService.php @@ -26,24 +26,6 @@ class TalkService extends BaseService { use PagingTrait; - /** - * 获取好友备注 - * - * @param int $user_id 用户ID - * @param int $friend_id 好友ID - * @return string - */ - public function getFriendRemark(int $user_id, int $friend_id) - { - $remark = FriendRemark::getInstance()->read($user_id, $friend_id); - if ($remark) return $remark; - - $remark = UsersFriend::where('user_id', $user_id)->where('friend_id', $friend_id)->value('remark'); - if ($remark) FriendRemark::getInstance()->save($user_id, $friend_id, $remark); - - return (string)$remark; - } - /** * 获取用户的聊天列表 * @@ -95,7 +77,7 @@ class TalkService extends BaseService $data['avatar'] = $item['user_avatar']; $data['unread_num'] = UnreadTalk::getInstance()->read($item['receiver_id'], $user_id); $data['is_online'] = $socketFDService->isOnlineAll($item['receiver_id'], $runIdAll); - $data['remark_name'] = $this->getFriendRemark($user_id, (int)$item['receiver_id']); + $data['remark_name'] = UsersFriend::getFriendRemark($user_id, (int)$item['receiver_id']); } else { $data['name'] = strval($item['group_name']); $data['avatar'] = $item['group_avatar']; diff --git a/app/Service/UserService.php b/app/Service/UserService.php index 0cf0123..ffbaedf 100644 --- a/app/Service/UserService.php +++ b/app/Service/UserService.php @@ -118,21 +118,17 @@ class UserService extends BaseService if (!$info) return []; $info = $info->toArray(); - $info['friend_status'] = 0;//朋友关系状态 0:本人 1:陌生人 2:朋友 + $info['friend_status'] = 0;//朋友关系状态[0:本人;1:陌生人;2:朋友;] $info['nickname_remark'] = ''; $info['friend_apply'] = 0; // 判断查询信息是否是自己 if ($friend_id != $me_user_id) { - $friendInfo = UsersFriend:: - where('user1', '=', $friend_id > $me_user_id ? $me_user_id : $friend_id) - ->where('user2', '=', $friend_id < $me_user_id ? $me_user_id : $friend_id) - ->where('status', 1) - ->first(['id', 'user1', 'user2', 'active', 'user1_remark', 'user2_remark']); + $is_friend = UsersFriend::isFriend($me_user_id, $friend_id, true, true); - $info['friend_status'] = $friendInfo ? 2 : 1; - if ($friendInfo) { - $info['nickname_remark'] = $friendInfo->user1 == $friend_id ? $friendInfo->user2_remark : $friendInfo->user1_remark; + $info['friend_status'] = $is_friend ? 2 : 1; + if ($is_friend) { + $info['nickname_remark'] = UsersFriend::getFriendRemark($me_user_id, $friend_id); } else { $res = UsersFriendsApply::where('user_id', $me_user_id) ->where('friend_id', $friend_id) @@ -146,4 +142,35 @@ class UserService extends BaseService return $info; } + + /** + * 获取用户好友列表 + * + * @param int $user_id 用户ID + * @return array + */ + public function getUserFriends(int $user_id) + { + return UsersFriend::leftJoin('users', 'users.id', '=', 'users_friends.friend_id') + ->where('user_id', $user_id)->where('users_friends.status', 1) + ->get([ + 'users.id', + 'users.nickname', + 'users.avatar', + 'users.motto', + 'users.gender', + 'users_friends.remark as friend_remark', + ])->toArray(); + } + + /** + * 获取指定用户的所有朋友的用户ID + * + * @param int $user_id 指定用户ID + * @return array + */ + public function getFriendIds(int $user_id) + { + return UsersFriend::where('user_id', $user_id)->where('status', 1)->pluck('friend_id')->toArray(); + } } diff --git a/app/helper.php b/app/helper.php index a074f4f..b0d3048 100644 --- a/app/helper.php +++ b/app/helper.php @@ -251,9 +251,21 @@ function parse_ids($ids) */ function push_amqp(ProducerMessage $message, bool $confirm = false, int $timeout = 5) { + push_redis_subscribe('websocket', $message->getPayload()); return container()->get(Producer::class)->produce($message, $confirm, $timeout); } +/** + * 推送消息到 Redis 订阅中 + * + * @param string $chan + * @param string|array $message + */ +function push_redis_subscribe(string $chan, $message) +{ + redis()->publish($chan, is_string($message) ? $message : json_encode($message)); +} + /** * 生成随机文件名 *