From 7474156cdb705109cb8dcef80dd54490f1ebcb19 Mon Sep 17 00:00:00 2001 From: gzydong Date: Fri, 20 Nov 2020 19:17:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/Amqp/Consumer/ChatMessageConsumer.php | 29 +-- app/Bootstrap/ServerStart.php | 2 + app/Controller/Api/V1/TalkController.php | 4 +- app/Controller/WebSocketController.php | 13 +- .../Handler/JwtAuthExceptionHandler.php | 2 - app/Model/EmoticonDetail.php | 2 +- app/Model/UsersEmoticon.php | 10 + app/Process/AsyncQueueConsumer.php | 36 ++++ app/Support/Packet.php | 171 ++++++++++++++++++ app/Support/SocketIOParser.php | 41 +++++ config/autoload/server.php | 4 +- 11 files changed, 290 insertions(+), 24 deletions(-) create mode 100644 app/Support/Packet.php create mode 100644 app/Support/SocketIOParser.php diff --git a/app/Amqp/Consumer/ChatMessageConsumer.php b/app/Amqp/Consumer/ChatMessageConsumer.php index a35c956..2765b09 100644 --- a/app/Amqp/Consumer/ChatMessageConsumer.php +++ b/app/Amqp/Consumer/ChatMessageConsumer.php @@ -11,6 +11,7 @@ use Hyperf\Redis\Redis; use PhpAmqpLib\Message\AMQPMessage; use Hyperf\Amqp\Message\Type; use Hyperf\Amqp\Builder\QueueBuilder; +use Hyperf\Utils\Coroutine; /** * @Consumer(name="聊天消息消费者",enable=true) @@ -43,7 +44,7 @@ class ChatMessageConsumer extends ConsumerMessage */ public function __construct() { - $this->setQueue('queue:im-message' . SERVER_RUN_ID); + $this->setQueue('queue:im-message:' . SERVER_RUN_ID); } /** @@ -67,20 +68,24 @@ class ChatMessageConsumer extends ConsumerMessage */ public function consumeMessage($data, AMQPMessage $message): string { - $redis = container()->get(Redis::class); - //[加锁]防止消息重复消费 - $lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']); - if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 120)) { - return Result::ACK; - } + go(function() use ($data) { + $redis = container()->get(Redis::class); - $server = server(); - foreach ($server->connections as $fd) { - if ($server->exist($fd) && $server->isEstablished($fd)) { - $server->push($fd, "Recv: 我是后台进程 [{$data['message']}]"); + //[加锁]防止消息重复消费 + $lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']); + if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) { + return Result::ACK; } - } + + $server = server(); + foreach ($server->connections as $fd) { + if ($server->exist($fd) && $server->isEstablished($fd)) { + $server->push($fd, "Recv: 我是后台进程 [{$data['message']}]"); + } + } + }); + return Result::ACK; } diff --git a/app/Bootstrap/ServerStart.php b/app/Bootstrap/ServerStart.php index c355be3..f0b6949 100644 --- a/app/Bootstrap/ServerStart.php +++ b/app/Bootstrap/ServerStart.php @@ -2,6 +2,8 @@ namespace App\Bootstrap; +use App\Support\Packet; +use App\Support\SocketIOParser; use Hyperf\Framework\Bootstrap\ServerStartCallback; use Swoole\Timer; use Hyperf\Redis\Redis; diff --git a/app/Controller/Api/V1/TalkController.php b/app/Controller/Api/V1/TalkController.php index 058dc8d..70fab1a 100644 --- a/app/Controller/Api/V1/TalkController.php +++ b/app/Controller/Api/V1/TalkController.php @@ -155,8 +155,8 @@ class TalkController extends CController ]); return UsersChatList::topItem($this->uid(), $params['list_id'], $params['type'] == 1) - ? $this->response->success([], '对话列表置顶成功...') - : $this->response->fail('对话列表置顶失败...'); + ? $this->response->success([], '对话列表置顶(或取消置顶)成功...') + : $this->response->fail('对话列表置顶(或取消置顶)失败...'); } /** diff --git a/app/Controller/WebSocketController.php b/app/Controller/WebSocketController.php index 1e67a5b..77c138d 100644 --- a/app/Controller/WebSocketController.php +++ b/app/Controller/WebSocketController.php @@ -32,6 +32,12 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos */ private $jwt; + /** + * @Inject + * @var Producer + */ + private $producer; + /** * @inject * @var SocketFDService @@ -69,9 +75,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos // 判断是否为心跳检测 if ($frame->data == 'PING') return; - $ip = config('ip_address'); - $producer = container()->get(Producer::class); - $producer->produce(new ChatMessageProducer("我是来自[{$ip} 服务器的消息],{$frame->data}")); + $this->producer->produce(new ChatMessageProducer("我是来自 xxxx 服务器的消息],{$frame->data}")); } /** @@ -85,8 +89,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos { $user_id = $this->socketFDService->findFdUserId($fd); - stdout_log()->notice("客户端FD:{$fd} 已关闭连接,用户ID为【{$user_id}】"); - stdout_log()->notice('关闭时间:' . date('Y-m-d H:i:s')); + stdout_log()->notice("客户端FD:{$fd} 已关闭连接 ,用户ID为【{$user_id}】,关闭时间:" . date('Y-m-d H:i:s')); // 解除fd关系 $this->socketFDService->removeRelation($fd); diff --git a/app/Exception/Handler/JwtAuthExceptionHandler.php b/app/Exception/Handler/JwtAuthExceptionHandler.php index 130c6e6..1a14710 100644 --- a/app/Exception/Handler/JwtAuthExceptionHandler.php +++ b/app/Exception/Handler/JwtAuthExceptionHandler.php @@ -16,8 +16,6 @@ class JwtAuthExceptionHandler extends ExceptionHandler { public function handle(Throwable $throwable, ResponseInterface $response) { - -// echo $throwable->getMessage(); // 判断被捕获到的异常是希望被捕获的异常 if ($throwable instanceof TokenValidException) { // 格式化输出 diff --git a/app/Model/EmoticonDetail.php b/app/Model/EmoticonDetail.php index 0bfde8a..7838eae 100644 --- a/app/Model/EmoticonDetail.php +++ b/app/Model/EmoticonDetail.php @@ -52,6 +52,6 @@ class EmoticonDetail extends BaseModel 'emoticon_id' => 'integer', 'user_id' => 'integer', 'file_size' => 'integer', - 'created_at' => 'datetime' + 'created_at' => 'integer' ]; } diff --git a/app/Model/UsersEmoticon.php b/app/Model/UsersEmoticon.php index d15b715..656a4b7 100644 --- a/app/Model/UsersEmoticon.php +++ b/app/Model/UsersEmoticon.php @@ -41,4 +41,14 @@ class UsersEmoticon extends BaseModel 'id' => 'integer', 'user_id' => 'integer' ]; + + /** + * + * @param string $value + * @return string + */ + public function getEmoticonIdsAttribute($value) + { + return explode(',', $value); + } } diff --git a/app/Process/AsyncQueueConsumer.php b/app/Process/AsyncQueueConsumer.php index 084f5d7..4a6f861 100644 --- a/app/Process/AsyncQueueConsumer.php +++ b/app/Process/AsyncQueueConsumer.php @@ -14,10 +14,46 @@ namespace App\Process; use Hyperf\AsyncQueue\Process\ConsumerProcess; use Hyperf\Process\Annotation\Process; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Message\AMQPMessage; + ///** // * @Process // */ class AsyncQueueConsumer extends ConsumerProcess { + public function handle(): void + { + //建立一个到RabbitMQ服务器的连接 + $connection = new AMQPStreamConnection('47.105.180.123', 5672, 'yuandong', 'yuandong','im'); + $channel = $connection->channel(); + + $channel->exchange_declare('im.message.fanout', 'fanout', true, false, false); + + list($queue_name, ,) = $channel->queue_declare("test", false, false, true, true); + + $channel->queue_bind($queue_name, 'im.message.fanout','routing-key-test'); + + echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; + + $callback = function($msg){ + $data = json_decode($msg->body,true); + $server = server(); + foreach ($server->connections as $fd) { + if ($server->exist($fd) && $server->isEstablished($fd)) { + $server->push($fd, "Recv: 我是后台进程 [{$data['message']}]"); + } + } + }; + + $channel->basic_consume($queue_name, 'asfafa', false, true, false, false, $callback); + + while(count($channel->callbacks)) { + $channel->wait(); + } + + $channel->close(); + $connection->close(); + } } diff --git a/app/Support/Packet.php b/app/Support/Packet.php new file mode 100644 index 0000000..c990245 --- /dev/null +++ b/app/Support/Packet.php @@ -0,0 +1,171 @@ + 'OPEN', + 1 => 'CLOSE', + 2 => 'PING', + 3 => 'PONG', + 4 => 'MESSAGE', + 5 => 'UPGRADE', + 6 => 'NOOP', + ]; + + /** + * Engine.io packet types. + */ + public static $engineTypes = [ + 0 => 'CONNECT', + 1 => 'DISCONNECT', + 2 => 'EVENT', + 3 => 'ACK', + 4 => 'ERROR', + 5 => 'BINARY_EVENT', + 6 => 'BINARY_ACK', + ]; + + /** + * Get socket packet type of a raw payload. + * + * @param string $packet + * + * @return int|null + */ + public static function getSocketType(string $packet) + { + $type = $packet[0] ?? null; + + if (!array_key_exists($type, static::$socketTypes)) { + return null; + } + + return (int)$type; + } + + /** + * Get data packet from a raw payload. + * + * @param string $packet + * + * @return array|null + */ + public static function getPayload(string $packet) + { + $packet = trim($packet); + $start = strpos($packet, '['); + + if ($start === false || substr($packet, -1) !== ']') { + return null; + } + + $data = substr($packet, $start, strlen($packet) - $start); + $data = json_decode($data, true); + + if (is_null($data)) { + return null; + } + + return [ + 'event' => $data[0], + 'data' => $data[1] ?? null, + ]; + } + + /** + * Return if a socket packet belongs to specific type. + * + * @param $packet + * @param string $typeName + * + * @return bool + */ + public static function isSocketType($packet, string $typeName) + { + $type = array_search(strtoupper($typeName), static::$socketTypes); + + if ($type === false) { + return false; + } + + return static::getSocketType($packet) === $type; + } +} diff --git a/app/Support/SocketIOParser.php b/app/Support/SocketIOParser.php new file mode 100644 index 0000000..bf563a1 --- /dev/null +++ b/app/Support/SocketIOParser.php @@ -0,0 +1,41 @@ +data); + + return [ + 'event' => $payload['event'] ?? null, + 'data' => $payload['data'] ?? null, + ]; + } +} diff --git a/config/autoload/server.php b/config/autoload/server.php index 3e3f3cd..3248d6a 100644 --- a/config/autoload/server.php +++ b/config/autoload/server.php @@ -46,12 +46,12 @@ return [ ], 'settings' => [ 'enable_coroutine' => true, - 'worker_num' => 1, + 'worker_num' => swoole_cpu_num(), 'pid_file' => BASE_PATH . '/runtime/hyperf.pid', 'open_tcp_nodelay' => true, 'max_coroutine' => 100000, 'open_http2_protocol' => true, - 'max_request' => 1000, + 'max_request' => 10000, 'socket_buffer_size' => 3 * 1024 * 1024, 'buffer_output_size' => 3 * 1024 * 1024, 'package_max_length'=> 10 * 1024 * 1024,