hyperf-chat/app/Cache/Repository/StreamRedis.php

155 lines
3.7 KiB
PHP
Raw Normal View History

2021-05-20 16:53:34 +08:00
<?php
namespace App\Cache\Repository;
use Closure;
2021-05-21 22:56:42 +08:00
use App\Traits\StaticInstance;
2021-05-20 16:53:34 +08:00
use App\Cache\Contracts\StreamRedisInterface;
2021-05-21 22:56:42 +08:00
class StreamRedis extends AbstractRedis implements StreamRedisInterface
2021-05-20 16:53:34 +08:00
{
2021-05-21 22:56:42 +08:00
protected $prefix = 'rds-stream';
2021-05-20 16:53:34 +08:00
2021-05-21 22:56:42 +08:00
protected $name = 'default';
2021-05-20 16:53:34 +08:00
/**
* 添加消息
*
* @param array $messages 消息信息
* @param int $maxLen 消息队列最大长度
* @param false $isApproximate
* @return string
*/
public function add(array $messages, $maxLen = 0, $isApproximate = false)
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xAdd($this->getCacheKey(), '*', $messages, $maxLen, $isApproximate);
2021-05-20 16:53:34 +08:00
}
/**
* 删除消息
*
* @param string ...$id 消息ID
* @return int
*/
public function rem(string ...$id)
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xDel($this->getCacheKey(), $id);
2021-05-20 16:53:34 +08:00
}
/**
* 消费者消息确认
*
* @param string $group 消费组
* @param string $id 消息ID
* @return int
*/
public function ack(string $group, string $id)
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xAck($this->getCacheKey(), $group, [$id]);
2021-05-20 16:53:34 +08:00
}
/**
* 获取消息总数
*
* @return int
*/
public function count()
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xLen($this->getCacheKey());
2021-05-20 16:53:34 +08:00
}
/**
* 获取所有消息
*
* @return array
*/
public function all()
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xRange($this->getCacheKey(), '-', '+');
2021-05-20 16:53:34 +08:00
}
/**
* 清空消息队列
*
* @return bool
*/
public function clear()
{
foreach ($this->all() as $k => $v) {
$this->rem($k);
}
return true;
}
/**
* 删除消息队列 KEY
*
* @return int
*/
public function delete()
{
2021-05-21 22:56:42 +08:00
return $this->redis()->del($this->getCacheKey());
2021-05-20 16:53:34 +08:00
}
/**
* 对消息队列进行修剪,限制长度
*
* @param int $maxLen
* @param bool $isApproximate
* @return int
*/
public function trim(int $maxLen, bool $isApproximate = false)
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xTrim($this->getCacheKey(), $maxLen, $isApproximate);
2021-05-20 16:53:34 +08:00
}
public function group($operation, $group, $msgId = '', $mkStream = false)
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xGroup($operation, $this->getCacheKey(), $group, $msgId, $mkStream);
2021-05-20 16:53:34 +08:00
}
public function pending($group, $start = null, $end = null, $count = null, $consumer = null)
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xPending($this->getCacheKey(), $group, $start, $end, $count, $consumer);
2021-05-20 16:53:34 +08:00
}
/**
* 查看队列信息
*
* @param string $operation [stream:队列信息groups:消费组信息]
* @return mixed
*/
public function info(string $operation = 'stream')
{
2021-05-21 22:56:42 +08:00
return $this->redis()->xInfo($operation, $this->getCacheKey());
2021-05-20 16:53:34 +08:00
}
/**
* 运行消息队列
*
* @param Closure $closure 闭包函数
* @param string $group 消费组
* @param string $consumer 消费者
* @param int $count 同时获取的任务个数
*/
public function run(Closure $closure, string $group, string $consumer, $count = 1)
{
$this->group('create', $group, '0');
while (true) {
2021-05-21 22:56:42 +08:00
$tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getCacheKey() => '>'], $count);
2021-05-20 16:53:34 +08:00
if (empty($tasks)) {
sleep(1);// 获取不到任务,延时一秒
continue;
}
2021-05-21 22:56:42 +08:00
foreach ($tasks[$this->getCacheKey()] as $id => $task) {
2021-05-20 16:53:34 +08:00
if ($closure($id, $task)) {
$this->ack($group, $id);
}
}
}
}
}