初始化

main
gzydong 2020-11-29 14:44:11 +08:00
parent a1ecb35f61
commit 8ce33cada3
25 changed files with 169 additions and 247 deletions

View File

@ -4,8 +4,6 @@ declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Model\Chat\ChatRecordsForward;
use App\Model\UsersFriend;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Message\ConsumerMessage;
@ -14,18 +12,20 @@ use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Message\AMQPMessage;
use App\Model\User;
use App\Helper\PushMessageHelper;
use App\Model\UsersFriend;
use App\Model\Chat\ChatRecord;
use App\Model\Chat\ChatRecordsCode;
use App\Model\Chat\ChatRecordsFile;
use App\Model\Chat\ChatRecordsInvite;
use App\Service\SocketFDService;
use App\Model\Chat\ChatRecordsForward;
use App\Service\SocketClientService;
use App\Service\SocketRoomService;
use App\Helper\PushMessageHelper;
/**
* 消息推送消费者队列
*
* @Consumer(name="聊天消息消费者",enable=true)
* @Consumer(name="ConsumerChat",enable=true)
*/
class ChatMessageConsumer extends ConsumerMessage
{
@ -51,9 +51,9 @@ class ChatMessageConsumer extends ConsumerMessage
public $routingKey = 'consumer:im:message';
/**
* @var SocketFDService
* @var SocketClientService
*/
private $socketFDService;
private $socketClientService;
/**
* @var SocketRoomService
@ -82,14 +82,14 @@ class ChatMessageConsumer extends ConsumerMessage
/**
* ChatMessageConsumer constructor.
* @param SocketFDService $socketFDService
* @param SocketClientService $socketClientService
* @param SocketRoomService $socketRoomService
*/
public function __construct(SocketFDService $socketFDService, SocketRoomService $socketRoomService)
public function __construct(SocketClientService $socketClientService, SocketRoomService $socketRoomService)
{
$this->socketFDService = $socketFDService;
$this->socketClientService = $socketClientService;
$this->socketRoomService = $socketRoomService;
$this->setQueue('queue:im-message:' . SERVER_RUN_ID);
$this->setQueue('queue:im:message:' . SERVER_RUN_ID);
}
/**
@ -139,19 +139,19 @@ class ChatMessageConsumer extends ConsumerMessage
$source = $data['data']['source'];
$fids = $this->socketFDService->findUserFds($data['data']['sender']);
$fds = $this->socketClientService->findUserFds($data['data']['sender']);
if ($source == 1) {// 私聊
$fids = array_merge($fids, $this->socketFDService->findUserFds($data['data']['receive']));
$fds = array_merge($fds, $this->socketClientService->findUserFds($data['data']['receive']));
} else if ($source == 2) {//群聊
$userIds = $this->socketRoomService->getRoomMembers(strval($data['data']['receive']));
foreach ($userIds as $uid) {
$fids = array_merge($fids, $this->socketFDService->findUserFds(intval($uid)));
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($uid)));
}
}
// 去重
$fids = array_unique($fids);
if (empty($fids)) {
$fds = array_unique($fds);
if (empty($fds)) {
return Result::ACK;
}
@ -245,12 +245,12 @@ class ChatMessageConsumer extends ConsumerMessage
])
];
$server = server();
foreach ($fids as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_talk', $msg]));
}
unset($result, $file, $code_block, $forward, $invite);
$server = websocket();
$notify = json_encode(['event_talk', $msg]);
foreach ($fds as $fd) {
$server->exist($fd) && $server->push($fd, $notify);
}
return Result::ACK;
@ -265,13 +265,11 @@ class ChatMessageConsumer extends ConsumerMessage
*/
public function onConsumeKeyboard(array $data, AMQPMessage $message)
{
$fds = $this->socketFDService->findUserFds($data['data']['receive_user']);
$server = server();
$fds = $this->socketClientService->findUserFds($data['data']['receive_user']);
$server = websocket();
$notify = json_encode(['event_keyboard', $data['data']]);
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_keyboard', $data['data']]));
}
$server->exist($fd) && $server->push($fd, $notify);
}
return Result::ACK;
@ -291,16 +289,14 @@ class ChatMessageConsumer extends ConsumerMessage
$fds = [];
foreach ($friends as $friend_id) {
$fds = array_merge($fds, $this->socketFDService->findUserFds(intval($friend_id)));
$fds = array_merge($fds, $this->socketClientService->findUserFds(intval($friend_id)));
}
$fds = array_unique($fds);
$server = server();
$server = websocket();
$notify = json_encode(['event_online_status', $data['data']]);
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_online_status', $data['data']]));
}
$server->exist($fd) && $server->push($fd, $notify);
}
return Result::ACK;
@ -320,27 +316,26 @@ class ChatMessageConsumer extends ConsumerMessage
$fds = [];
if ($record->source == 1) {
$fds = array_merge($fds, $this->socketFDService->findUserFds($record->user_id));
$fds = array_merge($fds, $this->socketFDService->findUserFds($record->receive_id));
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->user_id));
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$record->receive_id));
} else if ($record->source == 2) {
$userIds = $this->socketRoomService->getRoomMembers(strval($record->receive_id));
foreach ($userIds as $uid) {
$fds = array_merge($fds, $this->socketFDService->findUserFds(intval($uid)));
$fds = array_merge($fds, $this->socketClientService->findUserFds((int)$uid));
}
}
$fds = array_unique($fds);
$server = server();
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_revoke_talk', [
$server = websocket();
$notify = json_encode(['event_revoke_talk', [
'record_id' => $record->id,
'source' => $record->source,
'user_id' => $record->user_id,
'receive_id' => $record->receive_id,
]]));
}
]]);
foreach ($fds as $fd) {
$server->exist($fd) && $server->push($fd, $notify);
}
return Result::ACK;
@ -349,20 +344,19 @@ class ChatMessageConsumer extends ConsumerMessage
/**
* 好友申请消息
*
* @param array $data
* @param array $data 队列消息
* @param AMQPMessage $message
* @return string
*/
public function onConsumeFriendApply(array $data, AMQPMessage $message)
{
$fds = $this->socketFDService->findUserFds($data['data']['receive']);
$fds = $this->socketClientService->findUserFds($data['data']['receive']);
$fds = array_unique($fds);
$server = server();
$server = websocket();
$notify = json_encode(['event_friend_apply', $data['data']]);
foreach ($fds as $fd) {
$fd = intval($fd);
if ($server->exist($fd)) {
$server->push($fd, json_encode(['event_friend_apply', $data['data']]));
}
$server->exist($fd) && $server->push($fd, $notify);
}
return Result::ACK;

View File

@ -8,19 +8,18 @@ use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Utils\Str;
/**
* 消息生产者
*
* @package App\Amqp\Producer
*/
class ChatMessageProducer extends ProducerMessage
{
public $exchange = 'im.message.fanout';
// 交换机类型
public $type = Type::FANOUT;
const EVENTS = [
'event_talk',
'event_keyboard',
'event_online_status',
'event_revoke_talk',
'event_friend_apply'
];
// 交换机名称
public $exchange = 'im.message.fanout';
/**
* 初始化处理...
@ -31,10 +30,6 @@ class ChatMessageProducer extends ProducerMessage
*/
public function __construct(string $event, array $data, array $options = [])
{
if (!in_array($event, self::EVENTS)) {
new \Exception('事件名未注册...');
}
$message = [
'uuid' => $this->uuid(),// 自定义消息ID
'event' => $event,
@ -46,12 +41,12 @@ class ChatMessageProducer extends ProducerMessage
}
/**
* 生成唯一ID
* 生成唯一的消息ID
*
* @return string
*/
private function uuid()
{
return Str::random() . rand(100000, 999999) . uniqid();
return Str::random() . mt_rand(100000, 999999) . uniqid();
}
}

View File

@ -2,8 +2,6 @@
namespace App\Bootstrap;
use App\Support\Packet;
use App\Support\SocketIOParser;
use Hyperf\Framework\Bootstrap\ServerStartCallback;
use Swoole\Timer;
use Hyperf\Redis\Redis;
@ -24,13 +22,16 @@ class ServerStart extends ServerStartCallback
stdout_log()->info(sprintf('服务运行ID : %s', SERVER_RUN_ID));
// 维护服务运行状态
$this->setTimeOut();
$this->setRunIdTime();
Timer::tick(15000, function () {
$this->setTimeOut();
$this->setRunIdTime();
});
}
public function setTimeOut()
/**
* 更新运行ID时间
*/
public function setRunIdTime()
{
container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time());
}

View File

@ -4,10 +4,10 @@ declare(strict_types=1);
namespace App\Command;
use App\Service\SocketFDService;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
use App\Service\SocketClientService;
/**
* @Command
@ -34,12 +34,13 @@ class RemoveWsCacheCommand extends HyperfCommand
public function handle()
{
$socket = new SocketFDService();
$arr= $socket->getServerRunIdAll(2);
foreach ($arr as $run_id=>$value){
$socket = new SocketClientService();
$this->line('此过程可能耗时较长,请耐心等待!', 'info');
$arr = $socket->getServerRunIdAll(2);
foreach ($arr as $run_id => $value) {
go(function () use ($socket, $run_id) {
$socket->removeRedisCache($run_id);
});
}
$this->line('缓存已清除!', 'info');

View File

@ -1,12 +1,5 @@
<?php
/**
* Created by PhpStorm.
* User: admin
* Date: 2020/11/4
* Time: 11:43
*/
namespace App\Constants;
use Hyperf\Constants\AbstractConstants;

View File

@ -48,7 +48,7 @@ abstract class AbstractController
protected $validationFactory;
/**
* 自定义验证器
* 自定义控制器验证器
*
* @param mixed ...$arg
*/

View File

@ -3,14 +3,14 @@ declare(strict_types=1);
namespace App\Controller\Api\V1;
use App\Service\ArticleService;
use App\Service\UploadService;
use App\Support\RedisLock;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use App\Service\ArticleService;
use App\Service\UploadService;
use App\Support\RedisLock;
use Hyperf\Utils\Str;
/**

View File

@ -2,16 +2,16 @@
namespace App\Controller\Api\V1;
use App\Constants\ResponseCode;
use App\Model\User;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use Phper666\JWTAuth\JWT;
use App\Constants\ResponseCode;
use App\Model\User;
use App\Service\UserService;
use App\Service\SmsCodeService;
use Phper666\JWTAuth\JWT;
use App\Middleware\JWTAuthMiddleware;
/**
* 授权相关控制器

View File

@ -2,9 +2,9 @@
namespace App\Controller\Api\V1;
use Hyperf\Di\Annotation\Inject;
use App\Controller\AbstractController;
use App\Support\Response;
use Hyperf\Di\Annotation\Inject;
use Phper666\JWTAuth\JWT;
/**

View File

@ -2,14 +2,14 @@
namespace App\Controller\Api\V1;
use App\Constants\ResponseCode;
use App\Model\Emoticon;
use App\Model\EmoticonDetail;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use App\Constants\ResponseCode;
use App\Model\Emoticon;
use App\Model\EmoticonDetail;
use App\Service\EmoticonService;
/**

View File

@ -2,20 +2,20 @@
namespace App\Controller\Api\V1;
use App\Model\UsersFriend;
use App\Service\SocketRoomService;
use Hyperf\Amqp\Producer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use App\Service\GroupService;
use Hyperf\Amqp\Producer;
use App\Model\UsersFriend;
use App\Model\UsersChatList;
use App\Model\Group\UsersGroup;
use App\Model\Group\UsersGroupMember;
use App\Model\Group\UsersGroupNotice;
use App\Amqp\Producer\ChatMessageProducer;
use App\Service\SocketRoomService;
use App\Service\GroupService;
/**
* Class GroupController

View File

@ -2,8 +2,14 @@
namespace App\Controller\Api\V1;
use App\Cache\LastMsgCache;
use App\Cache\UnreadTalkCache;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use Hyperf\Amqp\Producer;
use Hyperf\Utils\Str;
use Psr\Http\Message\ResponseInterface;
use App\Model\EmoticonDetail;
use App\Model\FileSplitUpload;
use App\Model\User;
@ -12,15 +18,9 @@ use App\Model\UsersFriend;
use App\Model\Group\UsersGroup;
use App\Service\TalkService;
use App\Service\UploadService;
use Hyperf\Amqp\Producer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use Hyperf\Utils\Str;
use Psr\Http\Message\ResponseInterface;
use App\Amqp\Producer\ChatMessageProducer;
use App\Cache\LastMsgCache;
use App\Cache\UnreadTalkCache;
/**
* Class TalkController
@ -227,7 +227,7 @@ class TalkController extends CController
]);
[$isTrue, $message,] = $this->talkService->revokeRecord($this->uid(), $params['record_id']);
if($isTrue){
if ($isTrue) {
$this->producer->produce(
new ChatMessageProducer('event_revoke_talk', [
'record_id' => $params['record_id']
@ -379,7 +379,7 @@ class TalkController extends CController
return $this->response->success([
'rows' => $result,
'record_id' => $result ? $result[count($result) - 1]['id'] : 0,
'record_id' => $result ? end($result)['id'] : 0,
'limit' => $limit
]);
}
@ -448,7 +448,7 @@ class TalkController extends CController
return $this->response->success([
'rows' => $result,
'record_id' => $result ? $result[count($result) - 1]['id'] : 0,
'record_id' => $result ? end($result)['id'] : 0,
'limit' => $limit
]);
}

View File

@ -3,13 +3,13 @@ declare(strict_types=1);
namespace App\Controller\Api\V1;
use App\Service\SplitUploadService;
use App\Service\UploadService;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use App\Service\SplitUploadService;
use App\Service\UploadService;
/**
* 上传控制器

View File

@ -2,24 +2,24 @@
namespace App\Controller\Api\V1;
use App\Cache\ApplyNumCache;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Annotation\Middleware;
use App\Middleware\JWTAuthMiddleware;
use Hyperf\Amqp\Producer;
use App\Constants\ResponseCode;
use App\Helper\Hash;
use App\Model\User;
use App\Model\UsersChatList;
use App\Model\UsersFriend;
use App\Service\SmsCodeService;
use App\Support\SendEmailCode;
use App\Service\FriendService;
use App\Service\UserService;
use App\Service\SocketFDService;
use App\Service\SocketClientService;
use App\Service\SmsCodeService;
use App\Amqp\Producer\ChatMessageProducer;
use Hyperf\Amqp\Producer;
use App\Cache\ApplyNumCache;
/**
* Class UsersController
@ -45,9 +45,9 @@ class UsersController extends CController
/**
* @inject
* @var SocketFDService
* @var SocketClientService
*/
private $socketFDService;
private $socketClientService;
/**
* @Inject
@ -64,9 +64,9 @@ class UsersController extends CController
{
$rows = UsersFriend::getUserFriends($this->uid());
$runArr = $this->socketFDService->getServerRunIdAll();
$runArr = $this->socketClientService->getServerRunIdAll();
foreach ($rows as $k => $row) {
$rows[$k]['online'] = $this->socketFDService->isOnlineAll($row['id'], $runArr);
$rows[$k]['online'] = $this->socketClientService->isOnlineAll($row['id'], $runArr);
}
return $this->response->success($rows);
@ -264,7 +264,7 @@ class UsersController extends CController
ApplyNumCache::setInc((int)$params['friend_id']);
//判断对方是否在线。如果在线发送消息通知
if ($this->socketFDService->isOnlineAll((int)$params['friend_id'])) {
if ($this->socketClientService->isOnlineAll((int)$params['friend_id'])) {
$this->producer->produce(
new ChatMessageProducer('event_friend_apply', [
'sender' => $user_id,
@ -299,7 +299,7 @@ class UsersController extends CController
}
//判断对方是否在线。如果在线发送消息通知
if ($this->socketFDService->isOnlineAll((int)$params['friend_id'])) {
if ($this->socketClientService->isOnlineAll((int)$params['friend_id'])) {
// $this->producer->produce(
// new ChatMessageProducer('event_friend_apply', [
// 'sender' => $user_id,

View File

@ -27,11 +27,4 @@ class IndexController extends AbstractController
'message' => "Hello {$user}.",
];
}
public function upload(ResponseInterface $response)
{
return [
'method' => 'upload',
];
}
}

View File

@ -7,14 +7,13 @@ use Hyperf\Di\Annotation\Inject;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Phper666\JWTAuth\JWT;
use Hyperf\Amqp\Producer;
use Swoole\Http\Request;
use Swoole\Websocket\Frame;
use Hyperf\Amqp\Producer;
use Swoole\Http\Response;
use Swoole\WebSocket\Server;
use App\Traits\WebSocketTrait;
use App\Service\SocketFDService;
use Phper666\JWTAuth\JWT;
use App\Service\SocketClientService;
use App\Service\MessageHandleService;
use App\Service\SocketRoomService;
use App\Model\Group\UsersGroupMember;
@ -26,8 +25,6 @@ use App\Amqp\Producer\ChatMessageProducer;
*/
class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
use WebSocketTrait;
/**
* @Inject
* @var JWT
@ -42,9 +39,9 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
/**
* @inject
* @var SocketFDService
* @var SocketClientService
*/
private $socketFDService;
private $socketClientService;
/**
* @inject
@ -58,6 +55,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
*/
private $messageHandleService;
// 消息事件绑定
const EVENTS = [
'event_talk' => 'onTalk',
'event_keyboard' => 'onKeyboard',
@ -76,10 +74,10 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
stdout_log()->notice("用户连接信息 : user_id:{$userInfo['user_id']} | fd:{$request->fd} 时间:" . date('Y-m-d H:i:s'));
// 判断是否存在异地登录
$isOnline = $this->socketFDService->isOnlineAll(intval($userInfo['user_id']));
$isOnline = $this->socketClientService->isOnlineAll(intval($userInfo['user_id']));
// 绑定fd与用户关系
$this->socketFDService->bindRelation($request->fd, $userInfo['user_id']);
$this->socketClientService->bindRelation($request->fd, $userInfo['user_id']);
// 加入群聊
$groupIds = UsersGroupMember::getUserGroupIds($userInfo['user_id']);
@ -113,7 +111,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
[$event, $data] = array_values(json_decode($frame->data, true));
if (isset(self::EVENTS[$event])) {
$this->messageHandleService->{self::EVENTS[$event]}($server, $frame, $data);
call_user_func_array([$this->messageHandleService, self::EVENTS[$event]], [$server, $frame, $data]);
return;
}
}
@ -127,16 +125,15 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
*/
public function onClose($server, int $fd, int $reactorId): void
{
$user_id = $this->socketFDService->findFdUserId($fd);
$user_id = $this->socketClientService->findFdUserId($fd);
stdout_log()->notice("客户端FD:{$fd} 已关闭连接 用户ID为【{$user_id}】,关闭时间:" . date('Y-m-d H:i:s'));
// 解除fd关系
$this->socketFDService->removeRelation($fd);
$this->socketClientService->removeRelation($fd);
// 判断是否存在异地登录
$isOnline = $this->socketFDService->isOnlineAll(intval($user_id));
// ... 不存在异地登录,推送下线通知消息
$isOnline = $this->socketClientService->isOnlineAll(intval($user_id));
if (!$isOnline) {
// 推送消息至队列
$this->producer->produce(

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace App\Exception\Handler;
use App\Constants\ResponseCode;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\ExceptionHandler;
use Hyperf\HttpMessage\Stream\SwooleStream;
@ -27,7 +28,12 @@ class AppExceptionHandler extends ExceptionHandler
$this->logger->error(sprintf('%s[%s] in %s', $throwable->getMessage(), $throwable->getLine(), $throwable->getFile()));
$this->logger->error($throwable->getTraceAsString());
return $response->withHeader('Server', 'Hyperf')->withStatus(500)->withBody(new SwooleStream('Internal Server Error.'));
$data = json_encode([
'code' => ResponseCode::SERVER_ERROR,
'message' => 'Internal Server Error.'
], JSON_UNESCAPED_UNICODE);
return $response->withHeader('Server', 'Hyperf')->withStatus(500)->withBody(new SwooleStream($data));
}
/**

View File

@ -2,7 +2,6 @@
namespace App\Helper;
use App\Model\User;
/**
@ -12,16 +11,6 @@ use App\Model\User;
*/
class PushMessageHelper
{
// 消息事件类型
const events = [
'chat_message',//用户聊天消息
'friend_apply',//好友添加申请消息
'join_group', //入群消息
'login_notify',//好友登录消息通知
'input_tip',//好友登录消息通知
'revoke_records',//好友撤回消息通知
];
/**
* 格式化对话的消息体
*

View File

@ -14,9 +14,6 @@ namespace App\Process;
use Hyperf\AsyncQueue\Process\ConsumerProcess;
use Hyperf\Process\Annotation\Process;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
///**
// * @Process
// */
@ -24,36 +21,6 @@ class AsyncQueueConsumer extends ConsumerProcess
{
public function handle(): void
{
//建立一个到RabbitMQ服务器的连接
$connection = new AMQPStreamConnection('47.105.180.123', 5672, 'yuandong', 'yuandong','im');
$channel = $connection->channel();
$channel->exchange_declare('im.message.fanout', 'fanout', true, false, false);
list($queue_name, ,) = $channel->queue_declare("test", false, false, true, true);
$channel->queue_bind($queue_name, 'im.message.fanout','routing-key-test');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
$data = json_decode($msg->body,true);
$server = server();
foreach ($server->connections as $fd) {
if ($server->exist($fd) && $server->isEstablished($fd)) {
$server->push($fd, "Recv: 我是后台进程 [{$data['message']}]");
}
}
};
$channel->basic_consume($queue_name, 'asfafa', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}

View File

@ -11,6 +11,11 @@ use App\Traits\PagingTrait;
use Hyperf\DbConnection\Db;
use Exception;
/**
* 笔记服务层
*
* @package App\Service
*/
class ArticleService extends BaseService
{
use PagingTrait;
@ -41,7 +46,7 @@ class ArticleService extends BaseService
*/
public function getUserTags(int $user_id)
{
$items = ArticleTag::where('user_id', $user_id)->orderBy('id', 'desc')->get(['id', 'tag_name',Db::raw('0 as count')])->toArray();
$items = ArticleTag::where('user_id', $user_id)->orderBy('id', 'desc')->get(['id', 'tag_name', Db::raw('0 as count')])->toArray();
array_walk($items, function ($item) use ($user_id) {
$item['count'] = (int)Article::where('user_id', $user_id)->whereRaw("FIND_IN_SET({$item['id']},tags_id)")->count();
});

View File

@ -1,19 +1,18 @@
<?php
namespace App\Service;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Amqp\Producer;
use Swoole\Http\Response;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use App\Amqp\Producer\ChatMessageProducer;
use App\Cache\LastMsgCache;
use App\Cache\UnreadTalkCache;
use App\Model\Chat\ChatRecord;
use App\Model\Group\UsersGroup;
use App\Model\UsersFriend;
use Hyperf\Amqp\Producer;
use Swoole\Http\Response;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
class MessageHandleService
{
@ -25,9 +24,15 @@ class MessageHandleService
/**
* @inject
* @var SocketFDService
* @var SocketClientService
*/
private $socketFDService;
private $socketClientService;
/**
* @Inject
* @var UnreadTalkCache
*/
private $unreadTalkCache;
/**
* 对话消息
@ -39,9 +44,7 @@ class MessageHandleService
*/
public function onTalk($server, Frame $frame, $data)
{
// 当前用户ID
$user_id = $this->socketFDService->findFdUserId($frame->fd);
$user_id = $this->socketClientService->findFdUserId($frame->fd);
if ($user_id != $data['send_user']) {
return;
}
@ -73,7 +76,9 @@ class MessageHandleService
'created_at' => date('Y-m-d H:i:s'),
]);
if (!$result) return;
if (!$result) {
return;
}
// 判断是否私聊
if ($data['source_type'] == 1) {
@ -85,7 +90,7 @@ class MessageHandleService
], intval($data['receive_user']), intval($data['send_user']));
// 设置好友消息未读数
make(UnreadTalkCache::class)->setInc(intval($result->receive_id), strval($result->user_id));
$this->unreadTalkCache->setInc(intval($result->receive_id), strval($result->user_id));
}
$this->producer->produce(
@ -96,8 +101,6 @@ class MessageHandleService
'record_id' => $result->id
])
);
return true;
}
/**
@ -116,7 +119,5 @@ class MessageHandleService
'receive_user' => intval($data['receive_user']), //接收者ID
])
);
return true;
}
}

View File

@ -9,7 +9,7 @@ use Hyperf\Redis\Redis;
*
* @package App\Service
*/
class SocketFDService
class SocketClientService
{
/**
* fd与用户绑定(使用hash 做处理)
@ -117,7 +117,10 @@ class SocketFDService
*/
public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID)
{
return $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id));
$arr = $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id));
return $arr ? array_map(function ($fd) {
return (int)$fd;
}, $arr) : [];
}
/**

View File

@ -50,7 +50,7 @@ class TalkService extends BaseService
}
$socketFDService = make(SocketFDService::class);
$socketFDService = make(SocketClientService::class);
$runIdAll = $socketFDService->getServerRunIdAll();
$rows = array_map(function ($item) use ($user_id, $socketFDService, $runIdAll) {

View File

@ -1,28 +0,0 @@
<?php
namespace App\Traits;
/**
* Trait WebSocketTrait
*
* @package App\Traits
*/
trait WebSocketTrait
{
/**
* 聊天记录回调事件
*/
public function onTalk()
{
}
/**
* 键盘输入回调事件
*/
public function onInput()
{
}
}

View File

@ -29,6 +29,8 @@ function redis()
/**
* server 实例 基于 swoole server
*
* @return \Swoole\Coroutine\Server|\Swoole\Server
*/
function server()
{
@ -70,6 +72,9 @@ function stdout_log()
/**
* 文件日志
*
* @param string $name
* @return \Psr\Log\LoggerInterface
*/
function logger(string $name = 'APP')
{
@ -164,7 +169,7 @@ function get_media_url(string $path)
*/
function create_image_name(string $ext, int $width, int $height)
{
return uniqid() . Str::random(18) . uniqid() . '_' . $width . 'x' . $height . '.' . $ext;
return uniqid() . Str::random(16) . uniqid() . '_' . $width . 'x' . $height . '.' . $ext;
}
/**