初始化

main
gzydong 2020-11-20 19:17:11 +08:00
parent 2a7e886a70
commit 7474156cdb
11 changed files with 290 additions and 24 deletions

View File

@ -11,6 +11,7 @@ use Hyperf\Redis\Redis;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder;
use Hyperf\Utils\Coroutine;
/**
* @Consumer(name="聊天消息消费者",enable=true)
@ -43,7 +44,7 @@ class ChatMessageConsumer extends ConsumerMessage
*/
public function __construct()
{
$this->setQueue('queue:im-message' . SERVER_RUN_ID);
$this->setQueue('queue:im-message:' . SERVER_RUN_ID);
}
/**
@ -67,11 +68,13 @@ class ChatMessageConsumer extends ConsumerMessage
*/
public function consumeMessage($data, AMQPMessage $message): string
{
go(function() use ($data) {
$redis = container()->get(Redis::class);
//[加锁]防止消息重复消费
$lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']);
if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 120)) {
if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 60)) {
return Result::ACK;
}
@ -81,6 +84,8 @@ class ChatMessageConsumer extends ConsumerMessage
$server->push($fd, "Recv: 我是后台进程 [{$data['message']}]");
}
}
});
return Result::ACK;
}

View File

@ -2,6 +2,8 @@
namespace App\Bootstrap;
use App\Support\Packet;
use App\Support\SocketIOParser;
use Hyperf\Framework\Bootstrap\ServerStartCallback;
use Swoole\Timer;
use Hyperf\Redis\Redis;

View File

@ -155,8 +155,8 @@ class TalkController extends CController
]);
return UsersChatList::topItem($this->uid(), $params['list_id'], $params['type'] == 1)
? $this->response->success([], '对话列表置顶成功...')
: $this->response->fail('对话列表置顶失败...');
? $this->response->success([], '对话列表置顶(或取消置顶)成功...')
: $this->response->fail('对话列表置顶(或取消置顶)失败...');
}
/**

View File

@ -32,6 +32,12 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
*/
private $jwt;
/**
* @Inject
* @var Producer
*/
private $producer;
/**
* @inject
* @var SocketFDService
@ -69,9 +75,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
// 判断是否为心跳检测
if ($frame->data == 'PING') return;
$ip = config('ip_address');
$producer = container()->get(Producer::class);
$producer->produce(new ChatMessageProducer("我是来自[{$ip} 服务器的消息]{$frame->data}"));
$this->producer->produce(new ChatMessageProducer("我是来自 xxxx 服务器的消息]{$frame->data}"));
}
/**
@ -85,8 +89,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
{
$user_id = $this->socketFDService->findFdUserId($fd);
stdout_log()->notice("客户端FD:{$fd} 已关闭连接,用户ID为【{$user_id}");
stdout_log()->notice('关闭时间:' . date('Y-m-d H:i:s'));
stdout_log()->notice("客户端FD:{$fd} 已关闭连接 用户ID为【{$user_id}】,关闭时间:" . date('Y-m-d H:i:s'));
// 解除fd关系
$this->socketFDService->removeRelation($fd);

View File

@ -16,8 +16,6 @@ class JwtAuthExceptionHandler extends ExceptionHandler
{
public function handle(Throwable $throwable, ResponseInterface $response)
{
// echo $throwable->getMessage();
// 判断被捕获到的异常是希望被捕获的异常
if ($throwable instanceof TokenValidException) {
// 格式化输出

View File

@ -52,6 +52,6 @@ class EmoticonDetail extends BaseModel
'emoticon_id' => 'integer',
'user_id' => 'integer',
'file_size' => 'integer',
'created_at' => 'datetime'
'created_at' => 'integer'
];
}

View File

@ -41,4 +41,14 @@ class UsersEmoticon extends BaseModel
'id' => 'integer',
'user_id' => 'integer'
];
/**
*
* @param string $value
* @return string
*/
public function getEmoticonIdsAttribute($value)
{
return explode(',', $value);
}
}

View File

@ -14,10 +14,46 @@ namespace App\Process;
use Hyperf\AsyncQueue\Process\ConsumerProcess;
use Hyperf\Process\Annotation\Process;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
///**
// * @Process
// */
class AsyncQueueConsumer extends ConsumerProcess
{
public function handle(): void
{
//建立一个到RabbitMQ服务器的连接
$connection = new AMQPStreamConnection('47.105.180.123', 5672, 'yuandong', 'yuandong','im');
$channel = $connection->channel();
$channel->exchange_declare('im.message.fanout', 'fanout', true, false, false);
list($queue_name, ,) = $channel->queue_declare("test", false, false, true, true);
$channel->queue_bind($queue_name, 'im.message.fanout','routing-key-test');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
$data = json_decode($msg->body,true);
$server = server();
foreach ($server->connections as $fd) {
if ($server->exist($fd) && $server->isEstablished($fd)) {
$server->push($fd, "Recv: 我是后台进程 [{$data['message']}]");
}
}
};
$channel->basic_consume($queue_name, 'asfafa', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}

171
app/Support/Packet.php Normal file
View File

@ -0,0 +1,171 @@
<?php
namespace App\Support;
/**
* Class Packet
*/
class Packet
{
/**
* Socket.io packet type `open`.
*/
const OPEN = 0;
/**
* Socket.io packet type `close`.
*/
const CLOSE = 1;
/**
* Socket.io packet type `ping`.
*/
const PING = 2;
/**
* Socket.io packet type `pong`.
*/
const PONG = 3;
/**
* Socket.io packet type `message`.
*/
const MESSAGE = 4;
/**
* Socket.io packet type 'upgrade'
*/
const UPGRADE = 5;
/**
* Socket.io packet type `noop`.
*/
const NOOP = 6;
/**
* Engine.io packet type `connect`.
*/
const CONNECT = 0;
/**
* Engine.io packet type `disconnect`.
*/
const DISCONNECT = 1;
/**
* Engine.io packet type `event`.
*/
const EVENT = 2;
/**
* Engine.io packet type `ack`.
*/
const ACK = 3;
/**
* Engine.io packet type `error`.
*/
const ERROR = 4;
/**
* Engine.io packet type 'binary event'
*/
const BINARY_EVENT = 5;
/**
* Engine.io packet type `binary ack`. For acks with binary arguments.
*/
const BINARY_ACK = 6;
/**
* Socket.io packet types.
*/
public static $socketTypes = [
0 => 'OPEN',
1 => 'CLOSE',
2 => 'PING',
3 => 'PONG',
4 => 'MESSAGE',
5 => 'UPGRADE',
6 => 'NOOP',
];
/**
* Engine.io packet types.
*/
public static $engineTypes = [
0 => 'CONNECT',
1 => 'DISCONNECT',
2 => 'EVENT',
3 => 'ACK',
4 => 'ERROR',
5 => 'BINARY_EVENT',
6 => 'BINARY_ACK',
];
/**
* Get socket packet type of a raw payload.
*
* @param string $packet
*
* @return int|null
*/
public static function getSocketType(string $packet)
{
$type = $packet[0] ?? null;
if (!array_key_exists($type, static::$socketTypes)) {
return null;
}
return (int)$type;
}
/**
* Get data packet from a raw payload.
*
* @param string $packet
*
* @return array|null
*/
public static function getPayload(string $packet)
{
$packet = trim($packet);
$start = strpos($packet, '[');
if ($start === false || substr($packet, -1) !== ']') {
return null;
}
$data = substr($packet, $start, strlen($packet) - $start);
$data = json_decode($data, true);
if (is_null($data)) {
return null;
}
return [
'event' => $data[0],
'data' => $data[1] ?? null,
];
}
/**
* Return if a socket packet belongs to specific type.
*
* @param $packet
* @param string $typeName
*
* @return bool
*/
public static function isSocketType($packet, string $typeName)
{
$type = array_search(strtoupper($typeName), static::$socketTypes);
if ($type === false) {
return false;
}
return static::getSocketType($packet) === $type;
}
}

View File

@ -0,0 +1,41 @@
<?php
namespace App\Support;
class SocketIOParser extends Packet
{
/**
* Encode output payload for websocket push.
*
* @param string $event
* @param mixed $data
*
* @return mixed
*/
public static function encode(string $event, $data)
{
$packet = Packet::MESSAGE . Packet::EVENT;
$shouldEncode = is_array($data) || is_object($data);
$data = $shouldEncode ? json_encode($data) : $data;
$format = $shouldEncode ? '["%s",%s]' : '["%s","%s"]';
return $packet . sprintf($format, $event, $data);
}
/**
* Decode message from websocket client.
* Define and return payload here.
*
* @param \Swoole\Websocket\Frame $frame
*
* @return array
*/
public static function decode($frame)
{
$payload = Packet::getPayload($frame->data);
return [
'event' => $payload['event'] ?? null,
'data' => $payload['data'] ?? null,
];
}
}

View File

@ -46,12 +46,12 @@ return [
],
'settings' => [
'enable_coroutine' => true,
'worker_num' => 1,
'worker_num' => swoole_cpu_num(),
'pid_file' => BASE_PATH . '/runtime/hyperf.pid',
'open_tcp_nodelay' => true,
'max_coroutine' => 100000,
'open_http2_protocol' => true,
'max_request' => 1000,
'max_request' => 10000,
'socket_buffer_size' => 3 * 1024 * 1024,
'buffer_output_size' => 3 * 1024 * 1024,
'package_max_length'=> 10 * 1024 * 1024,