2021-05-20 16:53:34 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
namespace App\Cache\Repository;
|
|
|
|
|
|
|
|
|
|
use Closure;
|
2021-05-21 22:56:42 +08:00
|
|
|
|
use App\Traits\StaticInstance;
|
2021-05-20 16:53:34 +08:00
|
|
|
|
use App\Cache\Contracts\StreamRedisInterface;
|
|
|
|
|
|
2021-05-21 22:56:42 +08:00
|
|
|
|
class StreamRedis extends AbstractRedis implements StreamRedisInterface
|
2021-05-20 16:53:34 +08:00
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
protected $prefix = 'rds-stream';
|
2021-05-20 16:53:34 +08:00
|
|
|
|
|
2021-05-21 22:56:42 +08:00
|
|
|
|
protected $name = 'default';
|
2021-05-20 16:53:34 +08:00
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 添加消息
|
|
|
|
|
*
|
|
|
|
|
* @param array $messages 消息信息
|
|
|
|
|
* @param int $maxLen 消息队列最大长度
|
|
|
|
|
* @param false $isApproximate
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
public function add(array $messages, $maxLen = 0, $isApproximate = false)
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xAdd($this->getCacheKey(), '*', $messages, $maxLen, $isApproximate);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 删除消息
|
|
|
|
|
*
|
|
|
|
|
* @param string ...$id 消息ID
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
public function rem(string ...$id)
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xDel($this->getCacheKey(), $id);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 消费者消息确认
|
|
|
|
|
*
|
|
|
|
|
* @param string $group 消费组
|
|
|
|
|
* @param string $id 消息ID
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
public function ack(string $group, string $id)
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xAck($this->getCacheKey(), $group, [$id]);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取消息总数
|
|
|
|
|
*
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
public function count()
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xLen($this->getCacheKey());
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取所有消息
|
|
|
|
|
*
|
|
|
|
|
* @return array
|
|
|
|
|
*/
|
|
|
|
|
public function all()
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xRange($this->getCacheKey(), '-', '+');
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 清空消息队列
|
|
|
|
|
*
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
public function clear()
|
|
|
|
|
{
|
|
|
|
|
foreach ($this->all() as $k => $v) {
|
|
|
|
|
$this->rem($k);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 删除消息队列 KEY
|
|
|
|
|
*
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
public function delete()
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->del($this->getCacheKey());
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 对消息队列进行修剪,限制长度
|
|
|
|
|
*
|
|
|
|
|
* @param int $maxLen
|
|
|
|
|
* @param bool $isApproximate
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
public function trim(int $maxLen, bool $isApproximate = false)
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xTrim($this->getCacheKey(), $maxLen, $isApproximate);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public function group($operation, $group, $msgId = '', $mkStream = false)
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xGroup($operation, $this->getCacheKey(), $group, $msgId, $mkStream);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public function pending($group, $start = null, $end = null, $count = null, $consumer = null)
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xPending($this->getCacheKey(), $group, $start, $end, $count, $consumer);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 查看队列信息
|
|
|
|
|
*
|
|
|
|
|
* @param string $operation [stream:队列信息,groups:消费组信息]
|
|
|
|
|
* @return mixed
|
|
|
|
|
*/
|
|
|
|
|
public function info(string $operation = 'stream')
|
|
|
|
|
{
|
2021-05-21 22:56:42 +08:00
|
|
|
|
return $this->redis()->xInfo($operation, $this->getCacheKey());
|
2021-05-20 16:53:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 运行消息队列
|
|
|
|
|
*
|
|
|
|
|
* @param Closure $closure 闭包函数
|
|
|
|
|
* @param string $group 消费组
|
|
|
|
|
* @param string $consumer 消费者
|
|
|
|
|
* @param int $count 同时获取的任务个数
|
|
|
|
|
*/
|
|
|
|
|
public function run(Closure $closure, string $group, string $consumer, $count = 1)
|
|
|
|
|
{
|
|
|
|
|
$this->group('create', $group, '0');
|
|
|
|
|
|
|
|
|
|
while (true) {
|
2021-05-21 22:56:42 +08:00
|
|
|
|
$tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getCacheKey() => '>'], $count);
|
2021-05-20 16:53:34 +08:00
|
|
|
|
if (empty($tasks)) {
|
|
|
|
|
sleep(1);// 获取不到任务,延时一秒
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2021-05-21 22:56:42 +08:00
|
|
|
|
foreach ($tasks[$this->getCacheKey()] as $id => $task) {
|
2021-05-20 16:53:34 +08:00
|
|
|
|
if ($closure($id, $task)) {
|
|
|
|
|
$this->ack($group, $id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|