初始化
parent
0e2787f240
commit
bd6fecf39b
|
@ -7,6 +7,7 @@ namespace App\Amqp\Consumer;
|
||||||
use Hyperf\Amqp\Result;
|
use Hyperf\Amqp\Result;
|
||||||
use Hyperf\Amqp\Annotation\Consumer;
|
use Hyperf\Amqp\Annotation\Consumer;
|
||||||
use Hyperf\Amqp\Message\ConsumerMessage;
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
||||||
|
use Hyperf\Redis\Redis;
|
||||||
use PhpAmqpLib\Message\AMQPMessage;
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
use Hyperf\Amqp\Message\Type;
|
use Hyperf\Amqp\Message\Type;
|
||||||
use Hyperf\Amqp\Builder\QueueBuilder;
|
use Hyperf\Amqp\Builder\QueueBuilder;
|
||||||
|
@ -66,23 +67,21 @@ class ChatMessageConsumer extends ConsumerMessage
|
||||||
*/
|
*/
|
||||||
public function consumeMessage($data, AMQPMessage $message): string
|
public function consumeMessage($data, AMQPMessage $message): string
|
||||||
{
|
{
|
||||||
echo PHP_EOL . $data;
|
$redis = container()->get(Redis::class);
|
||||||
|
|
||||||
|
//[加锁]防止消息重复消费
|
||||||
|
$lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']);
|
||||||
|
if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 120)) {
|
||||||
|
return Result::ACK;
|
||||||
|
}
|
||||||
|
|
||||||
$server = server();
|
$server = server();
|
||||||
foreach (server()->connections as $fd) {
|
foreach (server()->connections as $fd) {
|
||||||
if ($server->isEstablished($fd)) {
|
if ($server->isEstablished($fd)) {
|
||||||
$server->push($fd, "Recv: 我是后台进程 [{$data}]");
|
$server->push($fd, "Recv: 我是后台进程 [{$data['message']}]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result::NACK;
|
return Result::ACK;
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param $data
|
|
||||||
*/
|
|
||||||
public function getClientFds($data)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,10 @@ class ChatMessageProducer extends ProducerMessage
|
||||||
'sender' => '', //发送者ID
|
'sender' => '', //发送者ID
|
||||||
'receive' => '', //接收者ID
|
'receive' => '', //接收者ID
|
||||||
'receiveType' => '', //接收者类型 1:好友;2:群组
|
'receiveType' => '', //接收者类型 1:好友;2:群组
|
||||||
'message' => []
|
'message' => $data
|
||||||
];
|
];
|
||||||
|
|
||||||
$this->payload = $data;
|
$this->payload = $message;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,6 +35,6 @@ class ChatMessageProducer extends ProducerMessage
|
||||||
*/
|
*/
|
||||||
private function uuid()
|
private function uuid()
|
||||||
{
|
{
|
||||||
return Str::random() . rand(100000, 999999);
|
return Str::random() . rand(100000, 999999).uniqid();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
|
|
||||||
namespace App\Bootstrap;
|
namespace App\Bootstrap;
|
||||||
|
|
||||||
use App\Service\SocketFDService;
|
use App\Service\SocketFDService;
|
||||||
use Hyperf\Framework\Bootstrap\ServerStartCallback;
|
use Hyperf\Framework\Bootstrap\ServerStartCallback;
|
||||||
use Hashids\Hashids;
|
use Hashids\Hashids;
|
||||||
use Hyperf\Di\Annotation\Inject;
|
use Swoole\Timer;
|
||||||
|
use Hyperf\Redis\Redis;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 自定义服务启动前回调事件
|
* 自定义服务启动前回调事件
|
||||||
|
@ -16,28 +16,23 @@ use Hyperf\Di\Annotation\Inject;
|
||||||
*/
|
*/
|
||||||
class ServerStart extends ServerStartCallback
|
class ServerStart extends ServerStartCallback
|
||||||
{
|
{
|
||||||
|
|
||||||
/**
|
|
||||||
* @inject
|
|
||||||
* @var SocketFDService
|
|
||||||
*/
|
|
||||||
private $socketFDService;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 回调事件
|
* 回调事件
|
||||||
*/
|
*/
|
||||||
public function beforeStart()
|
public function beforeStart()
|
||||||
{
|
{
|
||||||
$hashids = new Hashids('', 8, 'abcdefghijklmnopqrstuvwxyz');
|
|
||||||
|
|
||||||
// 服务运行ID
|
// 服务运行ID
|
||||||
define('SERVER_RUN_ID', $hashids->encode(time() . rand(1000, 9999)));
|
define('SERVER_RUN_ID', uniqid());
|
||||||
|
|
||||||
$this->socketFDService->removeRedisCache();
|
|
||||||
|
|
||||||
|
|
||||||
stdout_log()->info(sprintf('服务运行ID : %s', SERVER_RUN_ID));
|
stdout_log()->info(sprintf('服务运行ID : %s', SERVER_RUN_ID));
|
||||||
stdout_log()->info('服务启动前回调事件 : beforeStart ...');
|
|
||||||
|
$this->timer();
|
||||||
|
Timer::tick(15000, function () {
|
||||||
|
$this->timer();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
public function timer()
|
||||||
|
{
|
||||||
|
container()->get(Redis::class)->hset('SERVER_RUN_ID', SERVER_RUN_ID, time());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace App\Command;
|
||||||
|
|
||||||
|
use App\Service\SocketFDService;
|
||||||
|
use Hyperf\Command\Command as HyperfCommand;
|
||||||
|
use Hyperf\Command\Annotation\Command;
|
||||||
|
use Psr\Container\ContainerInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Command
|
||||||
|
*/
|
||||||
|
class RemoveWsCacheCommand extends HyperfCommand
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @var ContainerInterface
|
||||||
|
*/
|
||||||
|
protected $container;
|
||||||
|
|
||||||
|
public function __construct(ContainerInterface $container)
|
||||||
|
{
|
||||||
|
$this->container = $container;
|
||||||
|
|
||||||
|
parent::__construct('ws:remove-cache');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function configure()
|
||||||
|
{
|
||||||
|
parent::configure();
|
||||||
|
$this->setDescription('清除 WebSocket 客户端 FD 与用户绑定的缓存信息');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function handle()
|
||||||
|
{
|
||||||
|
$socket = new SocketFDService();
|
||||||
|
|
||||||
|
$arr= $socket->getServerRunIdAll(2);
|
||||||
|
|
||||||
|
foreach ($arr as $run_id=>$value){
|
||||||
|
$socket->removeRedisCache($run_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->line('缓存已清除!', 'info');
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
namespace App\Service;
|
namespace App\Service;
|
||||||
|
|
||||||
use Hyperf\Di\Annotation\Inject;
|
|
||||||
use Hyperf\Redis\Redis;
|
use Hyperf\Redis\Redis;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -15,19 +14,23 @@ class SocketFDService
|
||||||
/**
|
/**
|
||||||
* fd与用户绑定(使用hash 做处理)
|
* fd与用户绑定(使用hash 做处理)
|
||||||
*/
|
*/
|
||||||
const BIND_FD_TO_USER = 'socket:fd:user';
|
const BIND_FD_TO_USER = 'ws:fd:user';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用集合做处理
|
* 使用集合做处理
|
||||||
*/
|
*/
|
||||||
const BIND_USER_TO_FDS = 'socket:user:fds';
|
const BIND_USER_TO_FDS = 'ws:user:fds';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @inject
|
|
||||||
* @var Redis
|
* @var Redis
|
||||||
*/
|
*/
|
||||||
private $redis;
|
private $redis;
|
||||||
|
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this->redis = container()->get(Redis::class);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 客户端fd与用户ID绑定关系
|
* 客户端fd与用户ID绑定关系
|
||||||
*
|
*
|
||||||
|
@ -59,7 +62,7 @@ class SocketFDService
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 检测用户当前是否在线
|
* 检测用户当前是否在线(指定运行服务器)
|
||||||
*
|
*
|
||||||
* @param int $user_id 用户ID
|
* @param int $user_id 用户ID
|
||||||
* @param string $run_id 服务运行ID(默认当前服务ID)
|
* @param string $run_id 服务运行ID(默认当前服务ID)
|
||||||
|
@ -70,6 +73,24 @@ class SocketFDService
|
||||||
return $this->redis->scard(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)) ? true : false;
|
return $this->redis->scard(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id)) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检测用户当前是否在线(查询所有在线服务器)
|
||||||
|
*
|
||||||
|
* @param int $user_id 用户ID
|
||||||
|
* @param array $run_ids 服务运行ID
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function isOnlineAll(int $user_id, array $run_ids = [])
|
||||||
|
{
|
||||||
|
if (empty($run_ids)) $run_ids = $this->getServerRunIdAll();
|
||||||
|
|
||||||
|
foreach ($run_ids as $run_id) {
|
||||||
|
if ($this->isOnline($user_id, $run_id)) return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 查询客户端fd对应的用户ID
|
* 查询客户端fd对应的用户ID
|
||||||
*
|
*
|
||||||
|
@ -91,27 +112,50 @@ class SocketFDService
|
||||||
*/
|
*/
|
||||||
public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID)
|
public function findUserFds(int $user_id, $run_id = SERVER_RUN_ID)
|
||||||
{
|
{
|
||||||
return '';
|
return $this->redis->smembers(sprintf('%s:%s:%s', self::BIND_USER_TO_FDS, $run_id, $user_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取服务ID列表
|
||||||
|
*
|
||||||
|
* @param int $type 获取类型[1:正在运行;2:已超时;3:所有]
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public function getServerRunIdAll(int $type = 1)
|
||||||
|
{
|
||||||
|
$arr = $this->redis->hGetAll('SERVER_RUN_ID');
|
||||||
|
if ($type == 3) return $arr;
|
||||||
|
|
||||||
|
$current_time = time();
|
||||||
|
return array_filter($arr, function ($value) use ($current_time, $type) {
|
||||||
|
if ($type == 1) {
|
||||||
|
return ($current_time - intval($value)) <= 35;
|
||||||
|
} else {
|
||||||
|
return ($current_time - intval($value)) > 35;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 清除绑定缓存的信息
|
* 清除绑定缓存的信息
|
||||||
*
|
*
|
||||||
* @param string $run_id 服务运行ID(默认当前服务ID)
|
* @param string $run_id 服务运行ID
|
||||||
*/
|
*/
|
||||||
public function removeRedisCache($run_id = SERVER_RUN_ID)
|
public function removeRedisCache(string $run_id)
|
||||||
{
|
{
|
||||||
$this->redis->del(self::BIND_FD_TO_USER);
|
$this->redis->del(sprintf('%s:%s', self::BIND_FD_TO_USER, $run_id));
|
||||||
$prefix = self::BIND_USER_TO_FDS;
|
|
||||||
|
$prefix = sprintf('%s:%s', self::BIND_USER_TO_FDS, $run_id);
|
||||||
|
|
||||||
$iterator = null;
|
$iterator = null;
|
||||||
while (true) {
|
while (true) {
|
||||||
$keys = $this->redis->scan($iterator, "{$prefix}*");
|
$keys = $this->redis->scan($iterator, "{$prefix}*");
|
||||||
if ($keys === false) {
|
|
||||||
return;
|
if ($keys === false) return;
|
||||||
}
|
|
||||||
if (!empty($keys)) {
|
if (!empty($keys)) {
|
||||||
$this->redis->del(...$keys);
|
$this->redis->del(...$keys);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\Service;
|
||||||
|
|
||||||
|
|
||||||
|
class SocketRoomService
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
|
@ -47,7 +47,7 @@ class RedisLock
|
||||||
$redis = self::getRedis();
|
$redis = self::getRedis();
|
||||||
|
|
||||||
do {
|
do {
|
||||||
$acquired = $redis->set(self::getLockKey($key), $requestId, 'NX', 'EX', $lockSecond);
|
$acquired = $redis->rawCommand('SET', self::getLockKey($key), $requestId, 'NX', 'EX', $lockSecond);
|
||||||
if ($acquired) {
|
if ($acquired) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ use Psr\Http\Message\ServerRequestInterface;
|
||||||
use Swoole\Websocket\Frame;
|
use Swoole\Websocket\Frame;
|
||||||
use Swoole\WebSocket\Server as WebSocketServer;
|
use Swoole\WebSocket\Server as WebSocketServer;
|
||||||
use Hyperf\Utils\Str;
|
use Hyperf\Utils\Str;
|
||||||
|
use Hyperf\Redis\Redis;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 容器实例
|
* 容器实例
|
||||||
|
@ -165,4 +165,4 @@ function get_media_url(string $path)
|
||||||
function create_image_name(string $ext, int $width, int $height)
|
function create_image_name(string $ext, int $width, int $height)
|
||||||
{
|
{
|
||||||
return uniqid() . Str::random(18) . uniqid() . '_' . $width . 'x' . $height . '.' . $ext;
|
return uniqid() . Str::random(18) . uniqid() . '_' . $width . 'x' . $height . '.' . $ext;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,6 @@ return [
|
||||||
'callbacks' => [
|
'callbacks' => [
|
||||||
//自定义启动前事件
|
//自定义启动前事件
|
||||||
SwooleEvent::ON_BEFORE_START => [App\Bootstrap\ServerStart::class, 'beforeStart'],
|
SwooleEvent::ON_BEFORE_START => [App\Bootstrap\ServerStart::class, 'beforeStart'],
|
||||||
|
|
||||||
SwooleEvent::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'],
|
SwooleEvent::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'],
|
||||||
SwooleEvent::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'],
|
SwooleEvent::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'],
|
||||||
SwooleEvent::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'],
|
SwooleEvent::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'],
|
||||||
|
|
|
@ -19,6 +19,8 @@ return [
|
||||||
|
|
||||||
'ip_address'=>env('IP_ADDRESS', ''),
|
'ip_address'=>env('IP_ADDRESS', ''),
|
||||||
|
|
||||||
|
// 运行模式
|
||||||
|
'run_mode'=>'cluster',
|
||||||
StdoutLoggerInterface::class => [
|
StdoutLoggerInterface::class => [
|
||||||
'log_level' => [
|
'log_level' => [
|
||||||
LogLevel::ALERT,
|
LogLevel::ALERT,
|
||||||
|
|
Loading…
Reference in New Issue