hyperf-chat/app/Cache/Repository/StreamRedis.php

156 lines
3.6 KiB
PHP
Raw Normal View History

2021-05-20 16:53:34 +08:00
<?php
namespace App\Cache\Repository;
use Closure;
use App\Cache\Contracts\StreamRedisInterface;
class StreamRedis implements StreamRedisInterface
{
use RedisTrait;
private $prefix = 'rds:stream';
private $name = 'default';
/**
* 添加消息
*
* @param array $messages 消息信息
* @param int $maxLen 消息队列最大长度
* @param false $isApproximate
* @return string
*/
public function add(array $messages, $maxLen = 0, $isApproximate = false)
{
return $this->redis()->xAdd($this->getKeyName(), '*', $messages, $maxLen, $isApproximate);
}
/**
* 删除消息
*
* @param string ...$id 消息ID
* @return int
*/
public function rem(string ...$id)
{
return $this->redis()->xDel($this->getKeyName(), $id);
}
/**
* 消费者消息确认
*
* @param string $group 消费组
* @param string $id 消息ID
* @return int
*/
public function ack(string $group, string $id)
{
return $this->redis()->xAck($this->getKeyName(), $group, [$id]);
}
/**
* 获取消息总数
*
* @return int
*/
public function count()
{
return $this->redis()->xLen($this->getKeyName());
}
/**
* 获取所有消息
*
* @return array
*/
public function all()
{
return $this->redis()->xRange($this->getKeyName(), '-', '+');
}
/**
* 清空消息队列
*
* @return bool
*/
public function clear()
{
foreach ($this->all() as $k => $v) {
$this->rem($k);
}
return true;
}
/**
* 删除消息队列 KEY
*
* @return int
*/
public function delete()
{
return $this->redis()->del($this->getKeyName());
}
/**
* 对消息队列进行修剪,限制长度
*
* @param int $maxLen
* @param bool $isApproximate
* @return int
*/
public function trim(int $maxLen, bool $isApproximate = false)
{
return $this->redis()->xTrim($this->getKeyName(), $maxLen, $isApproximate);
}
public function group($operation, $group, $msgId = '', $mkStream = false)
{
return $this->redis()->xGroup($operation, $this->getKeyName(), $group, $msgId, $mkStream);
}
public function pending($group, $start = null, $end = null, $count = null, $consumer = null)
{
return $this->redis()->xPending($this->getKeyName(), $group, $start, $end, $count, $consumer);
}
/**
* 查看队列信息
*
* @param string $operation [stream:队列信息groups:消费组信息]
* @return mixed
*/
public function info(string $operation = 'stream')
{
return $this->redis()->xInfo($operation, $this->getKeyName());
}
/**
* 运行消息队列
*
* @param Closure $closure 闭包函数
* @param string $group 消费组
* @param string $consumer 消费者
* @param int $count 同时获取的任务个数
*/
public function run(Closure $closure, string $group, string $consumer, $count = 1)
{
$this->group('create', $group, '0');
while (true) {
$tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getKeyName() => '>'], $count);
if (empty($tasks)) {
sleep(1);// 获取不到任务,延时一秒
continue;
}
foreach ($tasks[$this->getKeyName()] as $id => $task) {
if ($closure($id, $task)) {
$this->ack($group, $id);
}
}
}
}
}