hyperf-chat/app/Amqp/Consumer/ChatMessageConsumer.php

88 lines
1.9 KiB
PHP
Raw Normal View History

2020-11-02 22:45:37 +08:00
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
2020-11-08 17:10:05 +08:00
use Hyperf\Redis\Redis;
2020-11-02 22:45:37 +08:00
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Builder\QueueBuilder;
/**
2020-11-07 22:57:10 +08:00
* @Consumer(name="聊天消息消费者",enable=true)
2020-11-02 22:45:37 +08:00
*/
2020-11-04 11:57:16 +08:00
class ChatMessageConsumer extends ConsumerMessage
2020-11-02 22:45:37 +08:00
{
/**
* 交换机名称
*
* @var string
*/
public $exchange = 'im.message.fanout';
/**
* 交换机类型
*
* @var string
*/
public $type = Type::FANOUT;
/**
* 路由key
*
* @var string
*/
public $routingKey = 'consumer:im:message';
2020-11-04 07:54:11 +08:00
/**
* ImMessageConsumer constructor.
*/
2020-11-03 17:12:57 +08:00
public function __construct()
{
2020-11-08 22:58:17 +08:00
$this->setQueue('queue:im-message' . SERVER_RUN_ID);
2020-11-03 17:12:57 +08:00
}
2020-11-02 22:45:37 +08:00
/**
* 重写创建队列生成类
*
* 注释:设置自动删除队列
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return parent::getQueueBuilder()->setAutoDelete(true);
}
/**
* 消费队列消息
*
* @param $data
* @param AMQPMessage $message
* @return string
*/
public function consumeMessage($data, AMQPMessage $message): string
{
2020-11-08 17:10:05 +08:00
$redis = container()->get(Redis::class);
//[加锁]防止消息重复消费
$lockName = sprintf('ws:message-lock:%s:%s', SERVER_RUN_ID, $data['uuid']);
if (!$redis->rawCommand('SET', $lockName, 1, 'NX', 'EX', 120)) {
return Result::ACK;
}
2020-11-02 22:45:37 +08:00
2020-11-03 14:07:54 +08:00
$server = server();
2020-11-08 17:36:48 +08:00
foreach ($server->connections as $fd) {
2020-11-03 14:07:54 +08:00
if ($server->isEstablished($fd)) {
2020-11-08 17:10:05 +08:00
$server->push($fd, "Recv: 我是后台进程 [{$data['message']}]");
2020-11-03 14:07:54 +08:00
}
}
2020-11-08 17:10:05 +08:00
return Result::ACK;
2020-11-02 22:45:37 +08:00
}
}