redis()->xAdd($this->getKeyName(), '*', $messages, $maxLen, $isApproximate); } /** * 删除消息 * * @param string ...$id 消息ID * @return int */ public function rem(string ...$id) { return $this->redis()->xDel($this->getKeyName(), $id); } /** * 消费者消息确认 * * @param string $group 消费组 * @param string $id 消息ID * @return int */ public function ack(string $group, string $id) { return $this->redis()->xAck($this->getKeyName(), $group, [$id]); } /** * 获取消息总数 * * @return int */ public function count() { return $this->redis()->xLen($this->getKeyName()); } /** * 获取所有消息 * * @return array */ public function all() { return $this->redis()->xRange($this->getKeyName(), '-', '+'); } /** * 清空消息队列 * * @return bool */ public function clear() { foreach ($this->all() as $k => $v) { $this->rem($k); } return true; } /** * 删除消息队列 KEY * * @return int */ public function delete() { return $this->redis()->del($this->getKeyName()); } /** * 对消息队列进行修剪,限制长度 * * @param int $maxLen * @param bool $isApproximate * @return int */ public function trim(int $maxLen, bool $isApproximate = false) { return $this->redis()->xTrim($this->getKeyName(), $maxLen, $isApproximate); } public function group($operation, $group, $msgId = '', $mkStream = false) { return $this->redis()->xGroup($operation, $this->getKeyName(), $group, $msgId, $mkStream); } public function pending($group, $start = null, $end = null, $count = null, $consumer = null) { return $this->redis()->xPending($this->getKeyName(), $group, $start, $end, $count, $consumer); } /** * 查看队列信息 * * @param string $operation [stream:队列信息,groups:消费组信息] * @return mixed */ public function info(string $operation = 'stream') { return $this->redis()->xInfo($operation, $this->getKeyName()); } /** * 运行消息队列 * * @param Closure $closure 闭包函数 * @param string $group 消费组 * @param string $consumer 消费者 * @param int $count 同时获取的任务个数 */ public function run(Closure $closure, string $group, string $consumer, $count = 1) { $this->group('create', $group, '0'); while (true) { $tasks = $this->redis()->xReadGroup($group, $consumer, [$this->getKeyName() => '>'], $count); if (empty($tasks)) { sleep(1);// 获取不到任务,延时一秒 continue; } foreach ($tasks[$this->getKeyName()] as $id => $task) { if ($closure($id, $task)) { $this->ack($group, $id); } } } } }